In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# CASSANDRA CONFIGURATION
cassandra_host = "cassandra"
spark = SparkSession \
    .builder \
    .master("local") \
    .appName('jupyter-pyspark') \
      .config("spark.cassandra.connection.host", cassandra_host) \
      .config("spark.jars.packages","com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0")\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [4]:
w_df = spark.read.json("file:////home/jovyan/datasets/weather/weather.json")
w_df.printSchema()

root
 |-- 2020census: long (nullable = true)
 |-- city: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- dew_point: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- moon_phase: double (nullable = true)
 |-- pct_clouds: long (nullable = true)
 |-- pct_humidity: long (nullable = true)
 |-- pressure: long (nullable = true)
 |-- rainfall: double (nullable = true)
 |-- snowfall: double (nullable = true)
 |-- state: string (nullable = true)
 |-- temperature.day: double (nullable = true)
 |-- temperature.eve: double (nullable = true)
 |-- temperature.max: double (nullable = true)
 |-- temperature.min: double (nullable = true)
 |-- temperature.morn: double (nullable = true)
 |-- temperature.night: double (nullable = true)
 |-- timezone: string (nullable = true)
 |-- uv_index: double (nullable = true)
 |-- wind.direction_deg: long (nu

                                                                                

In [5]:
print(w_df.count())

1600


In [6]:
print(w_df.select("date").distinct().count())



8


                                                                                

In [7]:
print(w_df.select("city","state").distinct().count())



200


                                                                                

In [8]:
print(w_df.select("date","city","state").distinct().count())



1600


                                                                                

In [9]:
w_df.toPandas()

Unnamed: 0,2020census,city,condition,date,description,dew_point,latitude,longitude,moon_phase,pct_clouds,...,temperature.eve,temperature.max,temperature.min,temperature.morn,temperature.night,timezone,uv_index,wind.direction_deg,wind.gust,wind.speed
0,8804190,New York,Clear,2021-10-19,clear sky,35.20,40.712728,-74.006015,0.47,0,...,63.82,67.53,50.32,50.45,61.65,America/New_York,3.32,291,31.52,16.35
1,8804190,New York,Clouds,2021-10-20,scattered clouds,48.34,40.712728,-74.006015,0.50,27,...,71.89,75.00,58.55,58.68,65.25,America/New_York,3.49,289,31.52,12.68
2,8804190,New York,Clouds,2021-10-21,broken clouds,46.96,40.712728,-74.006015,0.53,84,...,71.71,74.16,60.12,60.48,67.21,America/New_York,3.32,208,33.58,13.00
3,8804190,New York,Clouds,2021-10-22,scattered clouds,47.93,40.712728,-74.006015,0.56,27,...,65.68,68.20,59.32,62.19,59.32,America/New_York,2.44,220,34.05,14.05
4,8804190,New York,Rain,2021-10-23,light rain,37.71,40.712728,-74.006015,0.59,51,...,59.22,61.30,53.04,57.63,53.04,America/New_York,1.68,315,16.49,11.16
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1595,138486,Waco,Rain,2021-10-22,light rain,64.78,31.549333,-97.146670,0.56,19,...,77.79,85.93,67.26,67.26,73.90,America/Chicago,5.15,177,13.33,8.79
1596,138486,Waco,Rain,2021-10-23,light rain,64.81,31.549333,-97.146670,0.59,16,...,80.04,87.42,67.32,67.32,73.94,America/Chicago,5.27,165,27.96,13.82
1597,138486,Waco,Rain,2021-10-24,light rain,69.13,31.549333,-97.146670,0.62,100,...,79.72,83.14,69.98,70.52,75.67,America/Chicago,6.00,174,30.53,14.94
1598,138486,Waco,Clouds,2021-10-25,overcast clouds,69.85,31.549333,-97.146670,0.65,100,...,80.98,84.54,71.67,73.76,78.26,America/Chicago,6.00,182,30.87,17.67


In [10]:
!pip install -q cassandra-driver


In [29]:
query = '''
create table if not exists glab.weather_date(
census2020 int,
city text,
condition text,
weatherdate date,
description text,
dew_point decimal,
latitude decimal,
longitude decimal,
moon_phase decimal,
pct_clouds int,
pct_humidity int,
pressure int,
rainfall decimal,
snowfall decimal,
state text,
temperature_day decimal,
temperature_eve decimal,
temperature_max decimal,
temperature_min decimal,
temperature_morn decimal,
temperature_night decimal,
timezone text,
uv_index decimal,
wind_direction_deg int,
wind_gust decimal,
wind_speed decimal,
primary key(weatherdate,state,city)
);
'''
from cassandra.cluster import Cluster
with Cluster([cassandra_host]) as cluster:
    session = cluster.connect()
    session.execute(query)

In [20]:
weather = w_df.toDF("census2020",
"city",
"condition",
"weatherdate",
"description",
"dew_point" ,
"latitude",
"longitude",
"moon_phase",
"pct_clouds",
"pct_humidity",
"pressure",
"rainfall",
"snowfall",
"state",
"temperature_day",
"temperature_eve",
"temperature_max",
"temperature_min",
"temperature_morn",
"temperature_night",
"timezone",
"uv_index",
"wind_direction_deg",
"wind_gust",
"wind_speed")

In [30]:
weather.write.format("org.apache.spark.sql.cassandra")\
.mode("Append")\
.option("table","weather_date")\
.option("keyspace","glab")\
.save()

In [32]:
weather2 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table","weather_date")\
.option("keyspace","glab")\
.load()

weather2.count()

1600

In [33]:
weather2.createOrReplaceTempView("weather_dt")

In [34]:
query = '''
select city, state,weatherdate,condition,description, temperature_day from daily_city_weather where city = '
Syracuse' and state = 'New York'
'''
spark.sql(query).explain()

== Physical Plan ==
*(1) Project [city#306, state#305, weatherdate#307, condition#309, description#310, temperature_day#320]
+- BatchScan[state#305, city#306, weatherdate#307, condition#309, description#310, temperature_day#320] Cassandra Scan: glab.daily_city_weather
 - Cassandra Filters: [["state" = ?, New York],["city" = ?, 
Syracuse]]
 - Requested Columns: [state,city,weatherdate,condition,description,temperature_day]




In [35]:
#Qn9 

query = '''
select city, state,weatherdate,condition,description, temperature_day from daily_city_weather
where condition = 'Rain' and weatherdate = '2021-10-23'
'''

spark.sql(query).explain()

== Physical Plan ==
*(1) Project [city#306, state#305, weatherdate#307, condition#309, description#310, temperature_day#320]
+- *(1) Filter (condition#309 = Rain)
   +- BatchScan[state#305, city#306, weatherdate#307, condition#309, description#310, temperature_day#320] Cassandra Scan: glab.daily_city_weather
 - Cassandra Filters: [["weatherdate" = ?, 2021-10-23]]
 - Requested Columns: [state,city,weatherdate,condition,description,temperature_day]


