1
1
# Copyright (c) Microsoft Corporation.
2
2
# Licensed under the MIT license.
3
3
import re
4
- from typing import Optional , Type
4
+ from typing import List , Optional , Type
5
5
6
+ from lisa .base_tools import Cat
6
7
from lisa .executable import Tool
7
8
from lisa .tools .lsblk import Lsblk
8
9
from lisa .tools .rm import Rm
9
- from lisa .util import find_patterns_in_lines
10
+ from lisa .util import (
11
+ LisaException ,
12
+ find_patterns_groups_in_lines ,
13
+ find_patterns_in_lines ,
14
+ )
10
15
11
16
12
17
class SwapOn (Tool ):
@@ -28,6 +33,14 @@ def command(self) -> str:
28
33
29
34
30
35
class Swap (Tool ):
36
+ # Filename Type Size Used Priority
37
+ # /dev/sdb2 partition 1020 0 -2
38
+ # /swapfile file 200M 15M -3
39
+ # /mnt/swapfile file 2097148 0 -4
40
+ _SWAPS_PATTERN = re .compile (
41
+ r"(?P<filename>\S+)\s+(?P<type>\S+)\s+(?P<size>\d+)\w?\s+(?P<used>\d+)\w?\s+(?P<priority>-?\d+)" # noqa: E501
42
+ )
43
+
31
44
@property
32
45
def command (self ) -> str :
33
46
raise NotImplementedError ()
@@ -53,6 +66,26 @@ def is_swap_enabled(self) -> bool:
53
66
lsblk = self .node .tools [Lsblk ].run ().stdout
54
67
return "SWAP" in lsblk
55
68
69
+ def get_swap_partitions (self ) -> List [str ]:
70
+ # run 'cat /proc/swaps' or 'swapon -s' and parse the output
71
+ # The output is in the following format:
72
+ # <Filename> <Type> <Size> <Used> <Priority>
73
+ cat = self .node .tools [Cat ]
74
+ swap_result = cat .run ("/proc/swaps" , shell = True , sudo = True )
75
+ if swap_result .exit_code != 0 :
76
+ # Try another way to get swap information
77
+ swap_result = self .node .tools [SwapOn ].run ("-s" )
78
+ if swap_result .exit_code != 0 :
79
+ raise LisaException ("Failed to get swap information" )
80
+
81
+ output = swap_result .stdout
82
+ swap_parts : List [str ] = []
83
+ swap_entries = find_patterns_groups_in_lines (output , [self ._SWAPS_PATTERN ])[0 ]
84
+ for swap_entry in swap_entries :
85
+ if swap_entry ["type" ] == "partition" :
86
+ swap_parts .append (swap_entry ["filename" ])
87
+ return swap_parts
88
+
56
89
def create_swap (
57
90
self , path : str = "/tmp/swap" , size : str = "1M" , count : int = 1024
58
91
) -> None :
@@ -84,3 +117,19 @@ def is_swap_enabled(self) -> bool:
84
117
return True
85
118
86
119
return False
120
+
121
+ def get_swap_partitions (self ) -> List [str ]:
122
+ # run 'swapinfo -k' and parse the output
123
+ # The output is in the following format:
124
+ # <Device> <1K-blocks> <Used> <Avail> <Capacity>
125
+ swap_result = self .run ("-k" )
126
+ if swap_result .exit_code != 0 :
127
+ raise LisaException ("Failed to get swap information" )
128
+
129
+ output = swap_result .stdout
130
+ swap_parts : List [str ] = []
131
+ swap_entries = find_patterns_groups_in_lines (output , [self ._SWAP_ENTRIES ])[0 ]
132
+ # FreeBSD doesn't have swap files, only partitions
133
+ for swap_entry in swap_entries :
134
+ swap_parts .append (swap_entry ["device" ])
135
+ return swap_parts
0 commit comments