# Setup

## Spark session and context

In [None]:
!pip install pyspark

In [None]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## Spark Web UI

In [None]:
!pip install pyngrok

In [None]:
from pyngrok import ngrok

for tunnel in ngrok.get_tunnels():
  ngrok.disconnect(tunnel.public_url)
ngrok.connect(4040).public_url

## Download the dataset


In [None]:
!wget https://www.ncei.noaa.gov/pub/data/uscrn/products/hourly02/snapshots/CRNH0203-202101180550.zip

In [None]:
!unzip CRNH0203-202101180550.zip -d data

# Introduction to RDD

In [None]:
numbers = sc.parallelize(range(1000000))

In [None]:
numbers.take(5)

In [None]:
numbers.count()

In [None]:
numbers = sc.parallelize(range(1000000)).setName("numbers").cache()
numbers.count()

In [None]:
numbers.take(5)

In [None]:
numbers = sc.parallelize(range(1000000)).repartition(20).setName("numbers3").cache()
numbers.count()

# First MapReduce exercises

### Sum

In [None]:
numbers.reduce(
    lambda x, y: x + y
)

In [None]:
numbers.sum()

### Average

In [None]:
total, count = numbers.map(
    lambda x: (x, 1)
).reduce(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

total / count

In [None]:
numbers.mean()

### Maximum

In [None]:
numbers.reduce(lambda x, y: x if x >= y else y)

In [None]:
numbers.reduce(max)

In [None]:
numbers.max()

# Real-life RDD example: temperatures dataset

### Load the data in an RDD

In [None]:
header = sc.textFile("data/HEADERS.txt").map(lambda row: row.strip().split()).collect()[1]

In [None]:
!head data/2020/CRNH0203-2020-NY_Ithaca_13_E.txt

In [None]:
weather_rdd = sc.textFile("data/2020/*.txt").map(lambda row: row.split())

In [None]:
weather_rdd.cache().count()

### Maximum temperature per measurment station

In [None]:
index_station = header.index("WBANNO")
index_temperature = header.index("T_HR_AVG")

In [None]:
temperature_maxima = weather_rdd.map(
    lambda row: (row[index_station], float(row[index_temperature]))
).reduceByKey(
    lambda a, b: max(a, b)
).collect()

### Average temperatures per place

#### Two-pass solution

In [None]:
temperature_sums = weather_rdd.map(
    lambda row: (row[index_station], float(row[index_temperature]))
).reduceByKey(
    lambda a, b: a + b
).collect()

In [None]:
temperature_counts = weather_rdd.map(
    lambda row: (row[index_station], 1)
).reduceByKey(
    lambda a, b: a + b
).collect()

In [None]:
temperature_sums = dict(temperature_sums)
temperature_counts = dict(temperature_counts)
temperature_averages = {}

for station in temperature_sums.keys():
    temperature_averages[station] = temperature_sums[station] / temperature_counts[station]

#### Single-pass solution

In [None]:
temperature_averages = weather_rdd.map(
    lambda row: (row[index_station], (float(row[index_temperature]), 1))
).reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
).map(
    lambda x: (x[0], x[1][0] / x[1][1])
).collect()

### Temperatures histogram

In [None]:
from matplotlib import pyplot as plt

In [None]:
counts = weather_rdd.map(
    lambda row: (float(row[index_temperature]), 1)
).reduceByKey(
    lambda a, b: a + b
).collect()

In [None]:
x, y = zip(*sorted(counts)[1:])
plt.bar(x, y)

In [None]:
counts = weather_rdd.map(
    lambda row: float(row[index_temperature])
).countByValue()

### Average, min and max temperature per place - in a single pass!

In [None]:
temperature_stats = weather_rdd.map(
    lambda row: (
        row[index_station],
        (
            1,
            float(row[index_temperature]),
            float(row[index_temperature]),
            float(row[index_temperature]),
        )
    )
).filter(
    lambda t: t[1][1] > -9999.0
).reduceByKey(
    lambda a, b: (
        a[0] + b[0],       # running count
        a[1] + b[1],       # running sum
        min(a[2], b[2]),   # running minimum
        max(a[3], b[3]),   # running maximum
    )
).map(
    lambda t: (
        t[1][1] / t[1][0],  # average
        t[1][2],            # minimum
        t[1][3],            # maximum
    )
).collect()

# Spark SQL

## From RDD to DataFrame

In [None]:
products_rdd = sc.parallelize([
    {'name': 'iPhone', 'price': 800.},
    {'name': 'Galaxy', 'price': 799.},
    {'name': 'Huawei', 'price': 789.},
])

In [None]:
from pyspark.sql import Row

In [None]:
products_rdd = sc.parallelize([
    Row(name='iPhone', price=800.),
    Row(name='Galaxy', price=799.),
    Row(name='Huawei', price=789.),
])

In [None]:
products_df = products_rdd.toDF()

In [None]:
products_df.toPandas()

## Temperatures dataset: Spark SQL version

In [None]:
weather_df = weather_rdd.toDF(header)

In [None]:
weather_df.limit(10).toPandas()

In [None]:
weather_df.cache().count()

In [None]:
weather_df.sample(withReplacement=True, fraction=0.0001).toPandas()

In [None]:
weather_df = weather_df.withColumn("T_HR_AVG", weather_df["T_HR_AVG"].cast("float"))

In [None]:
temperature_averages = weather_df.groupBy("WBANNO").mean("T_HR_AVG").toPandas()

## Query with SQL language

In [None]:
weather_df.createOrReplaceTempView("weather")

In [None]:
temperature_averages = spark.sql("SELECT WBANNO, avg(T_HR_AVG) FROM weather GROUP BY WBANNO").toPandas()