<a href="https://colab.research.google.com/github/w4bo/handsOnDataPipelines/blob/main/materials/03-BigData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!wget http://big.csr.unibo.it/projects/nosql-datasets/2022-bbs-dsaa-foodmart.csv
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("Colab").getOrCreate()
spark

In [None]:
# let's create a simple example
riddle = "sopra la panca la capra campa sotto la panca la capra crepa"

In [None]:
sc = spark.sparkContext
rdd = sc.parallelize(riddle.split(" "))  # create an RDD from the `riddle` string
# each tuple of the RDD corresponds to a single word
# why is an RDD and not its result returned?
rdd

In [None]:
# Example of action
rdd.collect()

In [None]:
# Examples of transformations
# - transform each string in upper case (remember: map returns a new RDD with the same cardinality)
# - keep only the strings beginning with "C" (remember: filter returns a new RDD with the same or smaller cardinality)
# - explode each string into its characters (remember: flatMap returns a new RDD with the any cardinality)
rdd \
    .map(lambda s: s.upper()) \
    .filter(lambda s: s.startswith("C")) \
    .flatMap(lambda s: list(s)) \
    .collect()

In [None]:
# A simple word count
# - map each word to a tuple (word, 1); each tuple represent the count associate with a word
# - group all the tuples with the same word and sum the counts
# - sort tuples by count
# - get the values
rdd \
    .map(lambda s: (s, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1]) \
    .collect() 

In [None]:
df = spark.read.csv("2022-bbs-dsaa-foodmart.csv", header=True, sep=",")
df.show(5)

In [None]:
from pyspark.sql.functions import col
# Import the data
# - read the file
# - which has a csv (comma separated value) format
# - select some of its columns
# - cache the RDD
df = spark\
          .read \
          .csv("2022-bbs-dsaa-foodmart.csv", header=True, sep=",") \
          .select(col('Product (Category)').alias('product'), col("subcategory"), col("category"), col("unit sales").cast("int")) \
          .cache()

df.show(5)

In [None]:
# show only the sales for category = 'Pizza'
df \
    .filter("category = 'Pizza'") \
    .show(20, False)

In [None]:
# count all the product sales
df.count()

In [None]:
# count the distinct products
# - select the products only
# - get the distinct values
# - and count them
df.select("product") \
  .distinct() \
  .count() 

In [None]:
# get the number of products with category = 'Pizza'
# - filter pizzas
# - select the products only
# - get the distinct values
# - and count them
df \
    .filter("category = 'Pizza'") \
    .select("product") \
    .distinct() \
    .count() 

In [None]:
# get the average sales by category
# - select the products only
# - average the unit sales
df \
    .groupBy("category") \
    .avg("unit sales") \
    .show()