## Install and Setup a Spark session.

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=80875daed0effbc6464a4889570293d020447a0ebda67008db3bd4a0c8fad02f
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Weather Analysis").getOrCreate()

## <b>1 <span style='color:#0386f7de'>|</span> Reading and Understanding the Datasets</b>

In [8]:
df_weather = spark.read.csv('/content/LCD_sample.csv', header=True, inferSchema=True)
df_weather.show(5)


+----------+--------------------+---------+--------+---------+-------------+----------+-------------------+----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+----------------------+---------------+-------------------+-------------------+---------------------+----------------------+--------------------+----------------------+-------------------+----------------------+------------------------------+------------------------------+------------------------------+------------------------------------------+----------------------------+-------------------------------+------------------------------+----------------------+----------------------+------------+-----------+------------------+-------------+--------------+---------------------------+----------------------------+---------------------+------------------+-----------------+-------------------

## <b>2 <span style='color:#0386f7de'>|</span> Data Cleaning</b>

In [9]:
df_weather.count()

772

**Indeed non-null and non-NaN values**

In [11]:
from pyspark.sql.functions import isnan, when, count, col

# Count non-NaN values for each field
df_weather.select([count(when(~isnan(c) & ~col(c).isNull(), c)).alias(c) for c in ["HourlyDryBulbTemperatureF", "HourlyRelativeHumidity"]]).show()


+-------------------------+----------------------+
|HourlyDryBulbTemperatureF|HourlyRelativeHumidity|
+-------------------------+----------------------+
|                      742|                   742|
+-------------------------+----------------------+



**Check for Empty Strings**

In [26]:
df = df_weather.withColumn("HourlyDryBulbTemperatureF", when(col("HourlyDryBulbTemperatureF") == "", None).otherwise(col("HourlyDryBulbTemperatureF")))
df = df_weather.withColumn("HourlyRelativeHumidity", when(col("HourlyRelativeHumidity") == "", None).otherwise(col("HourlyRelativeHumidity")))
df

DataFrame[STATION: string, STATION_NAME: string, ELEVATION: double, LATITUDE: double, LONGITUDE: double, DATE: string, reportType: string, HourlySkyConditions: string, HourlyVisibility: double, HourlyPresentWeatherType: string, HourlyDryBulbTemperatureF: int, HourlyDryBulbTemperatureC: double, HourlyWetBulbTemperatureF: int, HourlyWetBulbTemperatureC: double, HourlyDewPointTemperatureF: int, HourlyDewPointTemperatureC: double, HourlyRelativeHumidity: int, HourlyWindSpeed: int, HourlyWindDirection: string, HourlyWindGustSpeed: int, HourlyStationPressure: double, HourlyPressureTendency: int, HourlyPressureChange: double, HourlySeaLevelPressure: string, HourlyPrecipitation: string, HourlyAltimeterSetting: double, DailyMaximumDryBulbTemperature: int, DailyMinimumDryBulbTemperature: int, DailyAverageDryBulbTemperature: int, DailyDepartureFromNormalAverageTemperature: double, DailyAverageRelativeHumidity: int, DailyAverageDewPointTemperature: string, DailyAverageWetBulbTemperature: int, Dail

**Fill Missing Values**

In [29]:
df = df_weather.na.fill({"HourlyDryBulbTemperatureF": 0, "HourlyRelativeHumidity": 0})

**Create our pipeline**

In [30]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["HourlyDryBulbTemperatureF", "HourlyRelativeHumidity"],
    outputCols=["HourlyDryBulbTemperatureF_imputed", "HourlyRelativeHumidity_imputed"]
).setStrategy("median")


In [31]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer

assembler = VectorAssembler(
    inputCols=["HourlyDryBulbTemperatureF_imputed", "HourlyRelativeHumidity_imputed"],
    outputCol="features"
)

# Define StandardScaler to scale the numerical features
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=False
)

In [32]:
# Define the pipeline which consists of stages
pipeline = Pipeline(stages=[imputer, assembler, scaler])

# Now you can fit and transform your pipeline
pipeline_model = pipeline.fit(df)
df_transformed = pipeline_model.transform(df)

In [33]:
# Show the results of the transformation
df_transformed.show()

+----------+--------------------+---------+--------+---------+--------------+----------+--------------------+----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+----------------------+---------------+-------------------+-------------------+---------------------+----------------------+--------------------+----------------------+-------------------+----------------------+------------------------------+------------------------------+------------------------------+------------------------------------------+----------------------------+-------------------------------+------------------------------+----------------------+----------------------+------------+-----------+------------------+-------------+--------------+---------------------------+----------------------------+---------------------+------------------+-----------------+-----------------