### Basic Data Analysis using pyspark

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, isnan, when, count, udf, year, month, to_date, mean
import pyspark.sql.functions as F
import seaborn as sns
import matplotlib.pyplot as plt


spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x11a2707f0>


In [26]:
def load_dataframe(filename):
    df = spark.read.format('csv').options(header='true').load(filename)
    return df

#creating a dataframe
df = load_dataframe('weatherHistory.csv')
df.limit(5).show()

+--------------------+-------------+-----------+-----------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|      Formatted Date|      Summary|Precip Type|  Temperature (C)|Apparent Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|       Daily Summary|
+--------------------+-------------+-----------+-----------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|2006-04-01 00:00:...|Partly Cloudy|       rain|9.472222222222221|      7.3888888888888875|    0.89|           14.1197|                 251.0|15.826300000000002|       0.0|             1015.13|Partly cloudy thr...|
|2006-04-01 01:00:...|Partly Cloudy|       rain|9.355555555555558|       7.227777777777776|    0.86|           14.2646|                 259.

                                                                                

In [27]:
df.printSchema()

root
 |-- Formatted Date: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): string (nullable = true)
 |-- Apparent Temperature (C): string (nullable = true)
 |-- Humidity: string (nullable = true)
 |-- Wind Speed (km/h): string (nullable = true)
 |-- Wind Bearing (degrees): string (nullable = true)
 |-- Visibility (km): string (nullable = true)
 |-- Loud Cover: string (nullable = true)
 |-- Pressure (millibars): string (nullable = true)
 |-- Daily Summary: string (nullable = true)



In [28]:
def rename_multiple_columns(df, columns):
    if isinstance(columns, dict):
        for old_name, new_name in columns.items():
            df = df.withColumnRenamed(old_name, new_name)
        return df
    else:
        raise ValueError("columns need to be in dict format {'existing_name_a':'new_name_a'}")

dict_columns = {
    "Formatted Date": "date",
    "Summary": "Summary",
    "Precip Type": "precip",
    "Temperature (C)": "Temperature",
    "Apparent Temperature (C)": "app_Temperature",
    "Humidity": "Humidity",
    "Wind Speed (km/h)": "Wind_Speed",
    "Wind Bearing (degrees)": "Wind_Bearing",
    "Visibility (km)": "Visibility",
    "Loud Cover": "Loud_Cover",
    "Pressure (millibars)": "Pressure",
    "Daily Summary": "Daily_Summary"
}

# Assuming df_matches is a PySpark DataFrame
df_renamed = rename_multiple_columns(df, dict_columns)


In [29]:
df_renamed.printSchema()

root
 |-- date: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- precip: string (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- app_Temperature: string (nullable = true)
 |-- Humidity: string (nullable = true)
 |-- Wind_Speed: string (nullable = true)
 |-- Wind_Bearing: string (nullable = true)
 |-- Visibility: string (nullable = true)
 |-- Loud_Cover: string (nullable = true)
 |-- Pressure: string (nullable = true)
 |-- Daily_Summary: string (nullable = true)



In [30]:
df_renamed.limit(5).show()

+--------------------+-------------+------+-----------------+------------------+--------+------------------+------------+------------------+----------+--------+--------------------+
|                date|      Summary|precip|      Temperature|   app_Temperature|Humidity|        Wind_Speed|Wind_Bearing|        Visibility|Loud_Cover|Pressure|       Daily_Summary|
+--------------------+-------------+------+-----------------+------------------+--------+------------------+------------+------------------+----------+--------+--------------------+
|2006-04-01 00:00:...|Partly Cloudy|  rain|9.472222222222221|7.3888888888888875|    0.89|           14.1197|       251.0|15.826300000000002|       0.0| 1015.13|Partly cloudy thr...|
|2006-04-01 01:00:...|Partly Cloudy|  rain|9.355555555555558| 7.227777777777776|    0.86|           14.2646|       259.0|15.826300000000002|       0.0| 1015.63|Partly cloudy thr...|
|2006-04-01 02:00:...|Mostly Cloudy|  rain|9.377777777777778| 9.377777777777778|    0.89|3

In [31]:
def quick_overview(df):
   # display the spark dataframe
   print("FIRST RECORDS")
   print(df.limit(2).sort(col("Formatted Date"), ascending=True).toPandas())

   # count null values
   print("COUNT NULL VALUES")
   print(df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c, y in df.dtypes if y in ["double", "float"]]
      ).toPandas())

#    print("DESCRIBE STATISTICS")
#    print(df.describe().toPandas())
#    Alternatively to get the max value, we could use max_value = df.agg({"precipitation": "max"}).collect()[0][0]

   # check for dublicates
   dublicates = df.groupby(df.date) \
    .count() \
    .where('count > 1') \
    .limit(5).toPandas()
    
   print("DUPLICATES:")
   print(dublicates)

   # print schema
   print("PRINT SCHEMA")
   print(df.printSchema())

quick_overview(df_renamed)

FIRST RECORDS
                            date        Summary precip        Temperature  \
0  2006-04-01 00:00:00.000 +0200  Partly Cloudy   rain  9.472222222222221   
1  2006-04-01 01:00:00.000 +0200  Partly Cloudy   rain  9.355555555555558   

      app_Temperature Humidity Wind_Speed Wind_Bearing          Visibility  \
0  7.3888888888888875     0.89    14.1197        251.0  15.826300000000002   
1   7.227777777777776     0.86    14.2646        259.0  15.826300000000002   

  Loud_Cover Pressure                      Daily_Summary  
0        0.0  1015.13  Partly cloudy throughout the day.  
1        0.0  1015.63  Partly cloudy throughout the day.  
COUNT NULL VALUES
Empty DataFrame
Columns: []
Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74

In [32]:
from pyspark.sql.functions import col, isnan, when, count, udf, year, month, to_date, mean
def binner(Temperature):
        if (Temperature is None):
            return "unknown"
        else:
            if Temperature < -10:
                return "freezing cold"
            elif Temperature < -5:
                return "very cold"
            elif Temperature < 0:
                return "cold"
            elif Temperature < 10:
                return "normal"
            elif Temperature < 20:
                return "warm"
            elif Temperature < 30:
                return "hot"
            elif Temperature >= 30:
                return "very hot"
        return "normal"


udf_binner_temp = udf(binner, StringType() )
df_renamed = df_renamed.withColumn("temp_buckets", udf_binner_temp(col("Temperature")))
