-
Notifications
You must be signed in to change notification settings - Fork 1
/
p8o_wizfi_netman.py
179 lines (155 loc) · 6.22 KB
/
p8o_wizfi_netman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# .';:cc;.
# .,',;lol::c.
# ;';lddddlclo
# lcloxxoddodxdool:,.
# cxdddxdodxdkOkkkkkkkd:.
# .ldxkkOOOOkkOO000Okkxkkkkx:.
# .lddxkkOkOOO0OOO0000Okxxxxkkkk:
# 'ooddkkkxxkO0000KK00Okxdoodxkkkko
# .ooodxkkxxxOO000kkkO0KOxolooxkkxxkl
# lolodxkkxxkOx,. .lkdolodkkxxxO.
# doloodxkkkOk .... .,cxO;
# ddoodddxkkkk: ,oxxxkOdc'..o'
# :kdddxxxxd, ,lolccldxxxkkOOOkkkko,
# lOkxkkk; :xkkkkkkkkOOO000OOkkOOk.
# ;00Ok' 'O000OO0000000000OOOO0Od.
# .l0l.;OOO000000OOOOOO000000x,
# .'OKKKK00000000000000kc.
# .:ox0KKKKKKK0kdc,.
# ...
#
# Author: peppe8o
# Date: Dec 2nd, 2022
# Version: 1.0
# blog: https://peppe8o.com
from machine import UART, Pin
import time
# WizFi360 settings for WizFi360-EVB-Pico board
PORT = 1
RX = 5
TX = 4
resetpin = 20
rtspin = False
# UART communication initialization
uart = UART(PORT, 115200, tx= Pin(TX), rx= Pin(RX), txbuf=1024, rxbuf=1024*2)
def UARTcomm(command, retry=3): # This controls one UART command exection
timeout = 5000
try_id = 0
uart.write(command + '\r\n')
return_line = b''
start = time.ticks_ms()
try:
while not "OK" in return_line:
while not uart.any():
if time.ticks_diff(time.ticks_ms(), start) > timeout: return "Timeout error!"
pass
resp = uart.readline()
return_line = return_line + resp
result = return_line.decode('utf-8').strip('\r\n')
if try_id > 0: print("Warning: tried " + try_id + " times, but this could not be a problem...")
return result
except:
pass
def scanAP(): # Scans the available Access Points and prints the related list
enc=["OPEN","WEP","WPA_PSK","WPA2_PSK","WPA_WPA2_PSK","WPA2_ENTERPRISE","WPA3_PSK","WPA2_WPA3_PSK","WAPI_PSK"]
scan=UARTcomm("AT+CWLAP")
AccessPoints = []
for line in scan.split("\r\n"):
if line[0:8] == "+CWLAP:(":
AccessPoint = line[8:-1].split(",")
for i, val in enumerate(AccessPoint):
AccessPoint[i] = str(val)
try:
AccessPoint[i] = int(AccessPoint[i])
except ValueError:
AccessPoint[i] = AccessPoint[i].strip('"')
AccessPoints.append(AccessPoint)
print("List of available APs:\r\n")
for ap in AccessPoints:
print("SSID: "+ap[1])
print("Encryption Method: "+enc[ap[0]])
print("Signal strength: "+str(ap[2]))
print("AP MAC Address: "+ap[3])
print("Channel Number: "+str(ap[4]))
print("WPS (0-disabled, 1-enabled): "+str(ap[5]))
print("\r\n")
def APconnect(ssid,pwd): # Connects to WiFi
print("Connecting to WiFi:")
result=UARTcomm('AT+CWJAP_CUR="'+ssid+'","'+pwd+'"')
print(result)
print("Retrieving IP and Mac Address:")
set_cmd=UARTcomm("AT+CIFSR")
for line in set_cmd.split("\r\n"):
if line[0:13] == "+CIFSR:STAIP,": ip=line[14:-1]
if line[0:14] == "+CIFSR:STAMAC,": mac=line[15:-1]
return ip,mac
def ping_test(dest): # performs a PING test
ping_result = UARTcomm('AT+PING="'+dest+'"')
res = ping_result.split("\r\n")
res_len = len(res)
if res[res_len-1] == "OK":
res_time = [idx for idx in res if idx.startswith('+')]
print("Ping test to " + dest + " successful.")
print("Response get in " + res_time[0] + " milliseconds")
return
else:
print("Ping test to " + dest + " failed!")
return
# ************************** MQTT SETUP ****************************
def mqtt_userinfo_config(username: str, password: str, client_id: str, keep_alive: int) -> bool:
"""Set the configuration of MQTT connection. """
mqtt_cfg = UARTcomm("AT+MQTTSET=" + '"' + str(username) + '","' + str(password) + '","' + str(client_id) + '",' + str(keep_alive))
res = mqtt_cfg.split("\r\n")
res_len = len(res)
if res[res_len-1] == "OK":
print("MQTT user info set")
return True
else:
print("Error occurred setting MQTT user info")
return False
def mqtt_set_topic(publish_topic: str, sub_topic: str) -> bool:
""" Registers a MQTT topics."""
mqtt_set_topic = UARTcomm("AT+MQTTTOPIC=" + '"'+str(publish_topic)+ '","' + str(sub_topic)+ '"')
res = mqtt_set_topic.split("\r\n")
res_len = len(res)
if res[res_len-1] == "OK":
print("Registered on topic "+publish_topic)
return True
else:
print("Error occurred registering on topic "+publish_topic)
return False
def mqtt_connect(auth_enable:int, broker_ip: str, broker_port: int, link_id: Optional[int] = None) -> bool:
"""Initiates connection with the MQTT Broker."""
if not link_id:
mqtt_con = UARTcomm("AT+MQTTCON=" + str(auth_enable) + ',"' + str(broker_ip) + '",' + str(broker_port))
else:
mqtt_con = UARTcomm("AT+MQTTCON=" + str(link_id) + "," + str(auth_enable) + ',"' + str(broker_ip) + '",' + str(broker_port))
res = mqtt_con.split("\r\n")
res_len = len(res)
if res[res_len-1] == "OK":
print("Connected to Broker " + broker_ip)
return True
else:
print("Error occurred connecting to Broker " + broker_ip)
return False
def mqtt_publish(message: bytes) -> bool:
"""Publishes a message to a topic provided."""
mqtt_pub = UARTcomm("AT+MQTTPUB=" + '"' + str(message) + '"')
res = mqtt_pub.split("\r\n")
res_len = len(res)
if res[res_len-1] == "OK":
print("Message: "+message+" sent")
return True
else:
print("Error occurred sending message "+message)
return False
def mqtt_disconnect() -> bool:
""" Disconnects the MiniMQTT client from the MQTT broker."""
mqtt_disc = UARTcomm("AT+MQTTDIS")
res = mqtt_disc.split("\r\n")
res_len = len(res)
if res[res_len-1] == "CLOSE":
print("Disconnected from MQTT broker")
return True
else:
return False