In [None]:
try:
  import requests
  import json
  import csv
  import pandas as pd
  from io import StringIO
  from pyspark.sql.types import *
  from pyspark.sql import SparkSession
  from pyspark.sql.functions import col, to_timestamp
  # Create a SparkSession object
  spark = SparkSession.builder.getOrCreate()
except Exception as e:
  !pip install pyspark
  import requests
  import json
  import csv
  import pandas as pd
  from io import StringIO
  from pyspark.sql.types import *
  from pyspark.sql import SparkSession
  from pyspark.sql.functions import col, to_timestamp
  # Create a SparkSession object
  spark = SparkSession.builder.getOrCreate()

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=87e7aac36ba3c65e0d1f209bb40fe0fbc600e467a6f23161e6df0422ee3cad94
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
#get data from thingspeak
channelID = '2339734'
readAPIKey = 'ISX149V3BH4ZXQU3'
response = requests.get(f'https://api.thingspeak.com/channels/{channelID}/feeds.csv?api_key={readAPIKey}')

try:
  if response.status_code != 200:
    print("Error!!!")
    print(response.status_code)
  else:
    df = pd.read_csv(StringIO(response.text))
    print(response.text)
    display(df)

    sparkdf = spark.createDataFrame(df)
    sparkdf.printSchema()
    display(sparkdf)
    sparkdf.createOrReplaceTempView("rawTable")
except Exception as e:
  print(e)


created_at,entry_id,field1,field2,field3
2023-11-11 12:15:41 UTC,1,SlotID,Availability,Timestamp
2023-11-11 12:18:32 UTC,2,SlotID,Availability,Timestamp
2023-11-11 12:18:49 UTC,3,SlotID,Availability,Timestamp
2023-11-11 12:20:49 UTC,4,SlotID,Availability,Timestamp
2023-11-11 12:31:37 UTC,5,SlotID,Availability,Timestamp
2023-11-11 12:32:17 UTC,6,IR1,0,2000-0-0T45:162:0
2023-11-11 13:07:21 UTC,7,IR1,0,2000-0-0T45:162:0
2023-11-11 13:31:15 UTC,8,IR1,2080,2000-0-8T45:162:0



Unnamed: 0,created_at,entry_id,field1,field2,field3
0,2023-11-11 12:15:41 UTC,1,SlotID,Availability,Timestamp
1,2023-11-11 12:18:32 UTC,2,SlotID,Availability,Timestamp
2,2023-11-11 12:18:49 UTC,3,SlotID,Availability,Timestamp
3,2023-11-11 12:20:49 UTC,4,SlotID,Availability,Timestamp
4,2023-11-11 12:31:37 UTC,5,SlotID,Availability,Timestamp
5,2023-11-11 12:32:17 UTC,6,IR1,0,2000-0-0T45:162:0
6,2023-11-11 13:07:21 UTC,7,IR1,0,2000-0-0T45:162:0
7,2023-11-11 13:31:15 UTC,8,IR1,2080,2000-0-8T45:162:0


root
 |-- created_at: string (nullable = true)
 |-- entry_id: long (nullable = true)
 |-- field1: string (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: string (nullable = true)



DataFrame[created_at: string, entry_id: bigint, field1: string, field2: string, field3: string]

In [None]:
#filter data
query = 'SELECT field1 AS SlotID, field2 AS Availability, created_at AS Timestamp FROM rawTable'
filterdf = spark.sql(query)
filterdf.createOrReplaceTempView("Table")
spark.sql('SELECT * FROM Table').show()

+------+------------+--------------------+
|SlotID|Availability|           Timestamp|
+------+------------+--------------------+
|SlotID|Availability|2023-11-11 12:15:...|
|SlotID|Availability|2023-11-11 12:18:...|
|SlotID|Availability|2023-11-11 12:18:...|
|SlotID|Availability|2023-11-11 12:20:...|
|SlotID|Availability|2023-11-11 12:31:...|
|   IR1|           0|2023-11-11 12:32:...|
|   IR1|           0|2023-11-11 13:07:...|
|   IR1|        2080|2023-11-11 13:31:...|
+------+------------+--------------------+



In [None]:
#latest data
opquery = '''SELECT SlotID, Availability FROM Table AS t1
WHERE Timestamp = (SELECT MAX(Timestamp) FROM Table AS t2 WHERE t1.SlotID = t2.SlotID)'''
latestdf = spark.sql(opquery)
# display(latestdf)
latestdf.show()

+------+------------+
|SlotID|Availability|
+------+------------+
|   IR1|        2080|
|SlotID|Availability|
+------+------------+



In [None]:
#slot allocation to user
userip = str(input("Enter the SlotID you want: "))
print(userip,"allocating...");

#check if slot exists
slotquery = f"SELECT COUNT(SlotID) as slots FROM Table WHERE SlotID = '{userip}'"
slotexist = spark.sql(slotquery).collect()[0]
print(slotexist)
if slotexist['slots']>0:
  #slot exists
  #check if slot is available
  checkquery = f"SELECT COUNT(SlotID) as count FROM Table WHERE SlotID = '{userip}' AND Availability = 0 AND Timestamp = (SELECT MAX(Timestamp) FROM Table WHERE SlotID = '{userip}')"
  checkresult = spark.sql(checkquery).collect()[0]
  if checkresult['count']>0:
    #slot available
    #updating table
    allocatequery = f"UPDATE Table SET Availability = 1 where SlotID = '{userip}' AND Timestamp = (SELECT MAX(Timestamp) FROM Table WHERE SlotID = '{userip}')"
    spark.sql(allocatequery)
    print("Allocated.")
  else:
    print("Slot ",userip," is not available!")
else:
  print("Slot ",userip," doesnot exist!")

#updated latest data
opquery = f"""SELECT SlotID, Availability FROM Table AS t1
WHERE Timestamp = (SELECT MAX(Timestamp) FROM Table AS t2 WHERE t1.SlotID = t2.SlotID)"""
latestdf = spark.sql(opquery)
latestdf.show()

Enter the SlotID you want: IR1
IR1 allocating...
Row(slots=3)
Slot  IR1  is not available!
+------+------------+
|SlotID|Availability|
+------+------------+
|   IR1|        2080|
|SlotID|Availability|
+------+------------+



In [None]:
#save data as csv
from google.colab import drive
drive.mount('/content/drive')
folderpath = '/content/drive/My Drive/RP1data'
df.to_csv(f'{folderpath}/SPSIRDATA.csv', index = False)
print("csv file saved in path ",folderpath)

Mounted at /content/drive
csv file saved in path  /content/drive/My Drive/RP1data
