In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 43.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=bf9eda8a7dd0ab0235fe8606ed9b4c04eae735dc6ec7d1312b1d810f97b15e8d
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from pyspark import SparkContext

from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder.appName('de').getOrCreate()

df = spark.read.csv('/content/data.csv', header = True, inferSchema=True)
df.printSchema()

root
 |-- sensor_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- reading: integer (nullable = true)



In [19]:
df.show()

+---------+-------------------+-----------+-------+
|sensor_id|          timestamp|sensor_type|reading|
+---------+-------------------+-----------+-------+
|     28 1|2021-04-30 00:50:30|temperature|   2000|
|     29 2|2021-04-30 00:50:30|   humidity|     60|
|     30 3|2021-04-30 00:50:40|temperature|   2500|
|     31 4|2021-04-30 00:50:40|   humidity|     75|
|     32 5|2021-04-30 00:50:40|   humidity|    101|
|     33 6|2021-04-30 00:50:40|   humidity|    -10|
+---------+-------------------+-----------+-------+



In [22]:
@udf(returnType=IntegerType())
def change_reading(t,r):
    if t == 'temperature':
        return int(r / 100)
    else:
        return int(r)


df2 = df.withColumn("reading", change_reading(col('sensor_type'), col('reading')))
df2.show()


+---------+-------------------+-----------+-------+
|sensor_id|          timestamp|sensor_type|reading|
+---------+-------------------+-----------+-------+
|     28 1|2021-04-30 00:50:30|temperature|     20|
|     29 2|2021-04-30 00:50:30|   humidity|     60|
|     30 3|2021-04-30 00:50:40|temperature|     25|
|     31 4|2021-04-30 00:50:40|   humidity|     75|
|     32 5|2021-04-30 00:50:40|   humidity|    101|
|     33 6|2021-04-30 00:50:40|   humidity|    -10|
+---------+-------------------+-----------+-------+



In [31]:
df3 = df2.filter((col('sensor_type') == 'humidity') &
                            ~(col('reading').between(0,100))).index
df3.show()

+---------+-------------------+-----------+-------+
|sensor_id|          timestamp|sensor_type|reading|
+---------+-------------------+-----------+-------+
|     32 5|2021-04-30 00:50:40|   humidity|    101|
|     33 6|2021-04-30 00:50:40|   humidity|    -10|
+---------+-------------------+-----------+-------+



In [32]:
df4 = df2.toPandas()
df4.head()

Unnamed: 0,sensor_id,timestamp,sensor_type,reading
0,28 1,2021-04-30 00:50:30,temperature,20
1,29 2,2021-04-30 00:50:30,humidity,60
2,30 3,2021-04-30 00:50:40,temperature,25
3,31 4,2021-04-30 00:50:40,humidity,75
4,32 5,2021-04-30 00:50:40,humidity,101


In [33]:
b = df4.loc[(df4['sensor_type']=='humidity') & ((df4['reading'] > 100) | (df4['reading'] < 0))].index
df4.drop(b)

Unnamed: 0,sensor_id,timestamp,sensor_type,reading
0,28 1,2021-04-30 00:50:30,temperature,20
1,29 2,2021-04-30 00:50:30,humidity,60
2,30 3,2021-04-30 00:50:40,temperature,25
3,31 4,2021-04-30 00:50:40,humidity,75


In [34]:
df4['DP'] = 25 - ((100 - df4['reading']) /5)  # default T = 25
df4.head()

Unnamed: 0,sensor_id,timestamp,sensor_type,reading,DP
0,28 1,2021-04-30 00:50:30,temperature,20,9.0
1,29 2,2021-04-30 00:50:30,humidity,60,17.0
2,30 3,2021-04-30 00:50:40,temperature,25,10.0
3,31 4,2021-04-30 00:50:40,humidity,75,20.0
4,32 5,2021-04-30 00:50:40,humidity,101,25.2
