# Importing library

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
sc = SparkContext('local')
spark = SparkSession(sc)



In [2]:
# pip install pyspark

In [3]:
print(14 * " >", "\t START LAB \t", "< " * 14, "\n\n\n")

 > > > > > > > > > > > > > > 	 START LAB 	 < < < < < < < < < < < < < <  





# Importing Dataset

In [4]:
#For Pyspark
df = spark.read.csv('GlobalLandTemperaturesByMajorCity.csv', header=True, inferSchema=True)
#For native SQL
csvFile = spark.read.csv('GlobalLandTemperaturesByMajorCity.csv', header=True, inferSchema=True) \
                    .createOrReplaceTempView("GlobTemp")

# Read Data

In [5]:
df

DataFrame[dt: date, AverageTemperature: double, AverageTemperatureUncertainty: double, City: string, Country: string, Latitude: string, Longitude: string]

In [6]:
csvFile

In [7]:
spark.sql("SELECT COUNT(*) FROM GlobTemp").show()

+--------+
|count(1)|
+--------+
|  239177|
+--------+



In [8]:
column_count = len(spark.table("GlobTemp").columns)

# Data Preprocessing

### Checking data type

In [9]:
df

DataFrame[dt: date, AverageTemperature: double, AverageTemperatureUncertainty: double, City: string, Country: string, Latitude: string, Longitude: string]

In [10]:
spark.sql("""
    SELECT dt
    FROM GlobTemp
    WHERE LENGTH(dt) != 10 OR dt NOT LIKE '____-__-__'
""").show()

+---+
| dt|
+---+
+---+



### Checking missing values

In [11]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE AverageTemperature IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|               11002|
+--------------------+



In [12]:
# new df without missing values
cleaned_df = spark.sql("""
    SELECT * FROM GlobTemp
    WHERE AverageTemperature IS NOT NULL
""")
cleaned_df

DataFrame[dt: date, AverageTemperature: double, AverageTemperatureUncertainty: double, City: string, Country: string, Latitude: string, Longitude: string]

In [13]:
cleaned_df.createOrReplaceTempView("GlobTemp")

In [14]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE AverageTemperature IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|                   0|
+--------------------+



In [15]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE AverageTemperatureUncertainty IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|                   0|
+--------------------+



In [16]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE dt IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|                   0|
+--------------------+



In [17]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE City IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|                   0|
+--------------------+



In [18]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE Country IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|                   0|
+--------------------+



In [19]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE Latitude IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|                   0|
+--------------------+



In [20]:
spark.sql("""
    SELECT COUNT(*) as missing_values_count
    FROM GlobTemp
    WHERE Longitude IS NULL
""").show()

+--------------------+
|missing_values_count|
+--------------------+
|                   0|
+--------------------+



### Checking outliers

In [21]:
spark.sql("""
    SELECT 
        percentile_approx(AverageTemperature, 0.25) as Q1,
        percentile_approx(AverageTemperature, 0.75) as Q3
    FROM GlobTemp
""").show()

+-----+------------------+
|   Q1|                Q3|
+-----+------------------+
|12.71|25.918000000000003|
+-----+------------------+



IQR = Q3 - Q1 = 25.918 - 12.71 = 13.208
- Lower Bound (LB) = Q1 - 1.5 * IQR = 12.71 - 1.5 * 13.208 = -7.602
- Upper Bound (UB) = Q3 + 1.5 * IQR = 25.918 + 1.5 * 13.208 = 46.230

In [22]:
spark.sql("""
    SELECT dt, AverageTemperature, City
    FROM GlobTemp
    WHERE AverageTemperature < -7.602 OR AverageTemperature > 46.230
""").show()


+----------+-------------------+---------+
|        dt| AverageTemperature|     City|
+----------+-------------------+---------+
|1767-01-01| -8.252999999999998|   Berlin|
|1776-01-01|             -8.336|   Berlin|
|1788-12-01| -7.790000000000001|   Berlin|
|1795-01-01|             -8.019|   Berlin|
|1823-01-01|             -9.809|   Berlin|
|1829-12-01|             -8.242|   Berlin|
|1830-01-01|             -7.604|   Berlin|
|1838-01-01|             -9.813|   Berlin|
|1848-01-01|             -9.276|   Berlin|
|1855-02-01|              -7.67|   Berlin|
|1893-01-01|             -8.071|   Berlin|
|1929-02-01|            -10.125|   Berlin|
|1940-01-01|             -9.689|   Berlin|
|1942-01-01| -7.867000000000001|   Berlin|
|1947-02-01|             -8.272|   Berlin|
|1956-02-01|             -9.646|   Berlin|
|1963-01-01|             -8.026|   Berlin|
|1820-12-01|            -15.398|Changchun|
|1821-01-01|            -15.507|Changchun|
|1821-02-01|-11.039000000000001|Changchun|
+----------

Outliers analysis:
- Berlin has had very low temperatures in January for many years, probably due to the fact that it was winter.
- Changchun's winter temperature is also very low, which is in line with the actual situation of its geographical location.

##### None of them are defined as outlier and no need to handle them.

### Checking duplicates

In [23]:
spark.sql("""
    SELECT dt, AverageTemperature, City, COUNT(*) as duplicates
    FROM GlobTemp
    GROUP BY dt, AverageTemperature, City
    HAVING duplicates > 1
""").show()

# no deplicates

+---+------------------+----+----------+
| dt|AverageTemperature|City|duplicates|
+---+------------------+----+----------+
+---+------------------+----+----------+



### Checking data consistency

In [24]:
spark.sql("""
    SELECT DISTINCT City
    FROM GlobTemp
    WHERE City NOT RLIKE '^[A-Z][a-z]*$'
""").show()

# data consistency

+----------------+
|            City|
+----------------+
|Ho Chi Minh City|
|     Los Angeles|
|   Santo Domingo|
|   Dar Es Salaam|
|     Addis Abeba|
|       Cape Town|
|  Belo Horizonte|
|       São Paulo|
|      Umm Durman|
|Saint Petersburg|
|        Brasília|
|       New Delhi|
|        New York|
|  Rio De Janeiro|
|          Bogotá|
+----------------+



# Task 1: Aggreagte by Key
- Sum
- Max
- Min

In [25]:
spark.sql("""SELECT YEAR(dt) AS Year, SUM(AverageTemperature) AS totalTemp, Max(AverageTemperature) AS maxTemp, Min(AverageTemperature) AS minTemp 
             FROM GlobTemp 
             GROUP BY Year 
             ORDER BY Year DESC""") \
     .show(10)

+----+------------------+-----------------+-------------------+
|Year|         totalTemp|          maxTemp|            minTemp|
+----+------------------+-----------------+-------------------+
|2013| 16352.24599999999|37.12600000000001|            -21.106|
|2012|23601.887000000002|           37.859|            -20.079|
|2011|         23459.036|           37.184|            -22.029|
|2010| 23894.08100000002|           37.899|            -18.555|
|2009|23800.501999999986|           36.607|-17.855999999999998|
|2008|23530.534000000043|           37.143|            -18.649|
|2007|23825.123999999985|           36.429|-12.937000000000001|
|2006|23752.792000000012|           37.041|-18.862000000000002|
|2005|23528.687000000013|           36.512|-17.621000000000006|
|2004|23606.644999999968|           36.542|-16.813000000000002|
+----+------------------+-----------------+-------------------+
only showing top 10 rows



# Task 2: Window Functions

In [26]:
from pyspark.sql import functions as F
windowval = (Window.partitionBy('City').orderBy('dt')
             .rangeBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('AverageTemperature').over(windowval))
df_w_cumsum.show(10)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+------------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|           cum_sum|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+------------------+
|1849-01-01|            26.704|                        1.435|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|            26.704|
|1849-02-01|            27.434|                        1.362|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|54.138000000000005|
|1849-03-01|            28.101|                        1.612|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|            82.239|
|1849-04-01|             26.14|           1.3869999999999998|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|           108.379|
|1849-05-01|            25.427|                          1.2|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|           133.806|
|1849-06-01|            

# Task 3: Pivot Tables

In [27]:
df.groupBy("City").pivot("Country").sum("AverageTemperature") \
.show(10)

+----------------+-----------+------+---------+----------+-----------------+-----+------+-----+-----------------+--------+----------------------------------+-------------+------------------+-----------------+--------+------+-------+-----------------+---------+-----------------+----+-----+-----+-----+------+-----------------+-------+--------+-----------------+-----------+------+------------+-------+------------------+-------+------------+-----------+------------------+-----+-----+------+--------+--------+------+-------+--------------+-------------+-----------------+--------+
|            City|Afghanistan|Angola|Australia|Bangladesh|           Brazil|Burma|Canada|Chile|            China|Colombia|Congo (Democratic Republic Of The)|Côte D'Ivoire|Dominican Republic|            Egypt|Ethiopia|France|Germany|            India|Indonesia|             Iran|Iraq|Italy|Japan|Kenya|Mexico|          Morocco|Nigeria|Pakistan|             Peru|Philippines|Russia|Saudi Arabia|Senegal|         Singapore

# Task 4: Multi-Level Aggregation
- city

In [28]:
spark.sql("""SELECT City, AVG(ds.sumTemp) AS avgTemp 
             FROM (SELECT dt,
                          City,
                          SUM(AverageTemperature) AS sumTemp
                   FROM GlobTemp
                   GROUP BY dt, City) AS ds 
             GROUP BY City""") \
     .show(10)

+----------------+------------------+
|            City|           avgTemp|
+----------------+------------------+
|       Bangalore|24.855895933014303|
|           Cairo| 21.22125921375925|
|      Casablanca|17.184157858613595|
|       Guangzhou| 21.60868426103644|
|       Fortaleza|27.008639541892705|
|Ho Chi Minh City|27.193983566940563|
|            Lima| 16.76911965811967|
|          Madrid|11.448704042956397|
|         Mashhad|12.571992111368898|
|       Singapore|26.523102826510698|
+----------------+------------------+
only showing top 10 rows

