In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_date, to_timestamp, year, avg, min, max
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
from datetime import datetime

In [3]:
import time

In [4]:
start_time = time.time()
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
print("--- %s seconds ---" % (time.time() - start_time))

--- 6.38475775718689 seconds ---


In [5]:
start_time = time.time()
df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
    .csv("/home/jovyan/weather.csv") 
print("--- %s seconds ---" % (time.time() - start_time))

--- 18.396015644073486 seconds ---


In [6]:
df = df.withColumn("StartTime", to_timestamp(df.StartTime)) \
     .withColumn("EndTime", to_timestamp(df.EndTime)) 

In [7]:
df.printSchema()
df.show(truncate=False)

root
 |-- EventId: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- EndTime: timestamp (nullable = true)
 |-- TimeZone: string (nullable = true)
 |-- LocationLat: double (nullable = true)
 |-- LocationLng: double (nullable = true)
 |-- AirportCode: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: integer (nullable = true)

+-------+----+--------+-------------------+-------------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|EventId|Type|Severity|StartTime          |EndTime            |TimeZone   |LocationLat|LocationLng|AirportCode|City    |County  |State|ZipCode|
+-------+----+--------+-------------------+-------------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|W-310  |Rain|Light   |2016-08-01 00:16:0

In [8]:
cityPar = 'Paynesville'
yearPar = 2016

In [9]:
start_time = time.time()
df.filter((df.City == cityPar) & \
        (year(df.StartTime) == yearPar))\
        .groupBy("Type")\
        .count()\
        .show(truncate=False)  
print("--- %s seconds ---" % (time.time() - start_time))

+-------------+-----+
|Type         |count|
+-------------+-----+
|Cold         |233  |
|Fog          |69   |
|Precipitation|3    |
|Rain         |197  |
|Snow         |90   |
+-------------+-----+

--- 18.041224002838135 seconds ---


In [10]:
start_time = time.time()
df.groupBy("County", year(df.StartTime))\
    .agg(avg("LocationLat").alias("avg_latitude"), \
        min("LocationLat").alias("min_latitude"), \
        max("LocationLat").alias("max_latitude"), \
        avg("LocationLng").alias("avg_longitude"), \
        min("LocationLng").alias("min_longitude"), \
        max("LocationLng").alias("max_longitude")) \
        .orderBy("County")\
        .show(truncate=False)  
print("--- %s seconds ---" % (time.time() - start_time))

+--------+---------------+------------------+------------+------------+------------------+-------------+-------------+
|County  |year(StartTime)|avg_latitude      |min_latitude|max_latitude|avg_longitude     |min_longitude|max_longitude|
+--------+---------------+------------------+------------+------------+------------------+-------------+-------------+
|Accomack|2020           |37.80375201550392 |37.6469     |37.9372     |-75.60176255814027|-75.7611     |-75.4662     |
|Accomack|2018           |37.78434527150702 |37.6469     |37.9372     |-75.62147681513244|-75.7611     |-75.4662     |
|Accomack|2017           |37.77527669464169 |37.6469     |37.9372     |-75.63068908973649|-75.7611     |-75.4662     |
|Accomack|2016           |37.781883852140076|37.6469     |37.9372     |-75.62397723735405|-75.7611     |-75.4662     |
|Accomack|2019           |37.795315593483345|37.6469     |37.9372     |-75.61033266097826|-75.7611     |-75.4662     |
|Ada     |2016           |43.567            |43.