# Spark Exercise

Apache Spark is an excellent tool for data engineering projects due to its robust ability to process large-scale data efficiently through distributed computing. Spark's in-memory processing capabilities significantly enhance the speed of data operations, making it ideal for handling big data workloads. It supports various data sources and formats, offering versatility in data ingestion and transformation. Additionally, Spark's rich API supports multiple programming languages such as Python, Java, and Scala, catering to diverse developer preferences. Its ecosystem, which includes libraries for SQL, machine learning, and graph processing, provides a comprehensive suite for building complex data pipelines and analytics, making it a powerful and flexible choice for data engineering tasks.

Use Python, ```pyspark``` and ```pandas``` to explore Apache Spark RDD and DataFrame:

# Spark RDD

Spark RDD (Resilient Distributed Dataset) is a fundamental data structure in Apache Spark that enables fault-tolerant, distributed processing of large datasets across multiple nodes in a cluster. Spark RDDs provide a higher-level abstraction for performing distributed data processing tasks, including both map (transformations) and reduce (aggregations) operations.

## Import Necessary Libraries

In [1]:
import os
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, to_timestamp, hour, sum, avg

In [2]:
# TODO
!pip install pyspark



Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m


## Spark Context and Session
Initialize Spark Context and Spark Session

In [3]:
# TODO
# SparkSession starten
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("My PySpark App") \
    .master("local") \
    .getOrCreate()

sc = spark.sparkContext



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/20 20:06:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/20 20:06:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/06/20 20:06:06 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/06/20 20:06:06 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/06/20 20:06:06 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [4]:
import os
print(os.getcwd())


/home/bdeng-g6/notebooks/Untitled Folder


## Load Data into RDD

In [5]:
# TODO
import pandas as pd
import os
os.getcwd()
df = pd.read_csv("/home/bdeng-g6/notebooks/spark/Top_100_most_Streamed.csv")
data_list = df.values.tolist() 
rdd = sc.parallelize(data_list)
rdd.collect()

[['Blinding Lights',
  'The Weeknd',
  'canadian contemporary r&b',
  2020,
  171,
  73,
  51,
  -6,
  9,
  33,
  200,
  0,
  6,
  91],
 ['Watermelon Sugar',
  'Harry Styles',
  'pop',
  2019,
  95,
  82,
  55,
  -4,
  34,
  56,
  174,
  12,
  5,
  88],
 ['Mood (feat. iann dior)',
  '24kGoldn',
  'cali rap',
  2021,
  91,
  72,
  70,
  -4,
  32,
  73,
  141,
  17,
  4,
  88],
 ['Someone You Loved',
  'Lewis Capaldi',
  'pop',
  2019,
  110,
  41,
  50,
  -6,
  11,
  45,
  182,
  75,
  3,
  86],
 ['Perfect',
  'Ed Sheeran',
  'pop',
  2017,
  95,
  45,
  60,
  -6,
  11,
  17,
  263,
  16,
  2,
  86],
 ['Believer',
  'Imagine Dragons',
  'modern rock',
  2017,
  125,
  78,
  78,
  -4,
  8,
  67,
  204,
  6,
  13,
  86],
 ['lovely (with Khalid)',
  'Billie Eilish',
  'electropop',
  2018,
  115,
  30,
  35,
  -10,
  10,
  12,
  200,
  93,
  3,
  86],
 ['Circles',
  'Post Malone',
  'dfw rap',
  2019,
  120,
  76,
  70,
  -3,
  9,
  55,
  215,
  19,
  4,
  86],
 ['Shape of You',
  'Ed Shee

## Map Operation

Split data into individual parts and create key-value pairs

In [6]:
# TODO
import pandas as pd

# 2. In Liste umwandeln
data_list = df.values.tolist()

# 3. Liste in RDD umwandeln
rdd = sc.parallelize(data_list)

# 4. Map-Operation: Alle Spalten in ein Dictionary umwandeln
rdd_mapped = rdd.map(lambda x: {
    "track": x[0],
    "artist": x[1],
    "album": x[2],
    "streams": x[3],
    "release_date": x[4]
})

# 5. Ausgabe prüfen
rdd_mapped.take(5)



[{'track': 'Blinding Lights',
  'artist': 'The Weeknd',
  'album': 'canadian contemporary r&b',
  'streams': 2020,
  'release_date': 171},
 {'track': 'Watermelon Sugar',
  'artist': 'Harry Styles',
  'album': 'pop',
  'streams': 2019,
  'release_date': 95},
 {'track': 'Mood (feat. iann dior)',
  'artist': '24kGoldn',
  'album': 'cali rap',
  'streams': 2021,
  'release_date': 91},
 {'track': 'Someone You Loved',
  'artist': 'Lewis Capaldi',
  'album': 'pop',
  'streams': 2019,
  'release_date': 110},
 {'track': 'Perfect',
  'artist': 'Ed Sheeran',
  'album': 'pop',
  'streams': 2017,
  'release_date': 95}]

## Reduce Operation

Reduce your key-value pairs

In [7]:
# TODO
import pandas as pd

data_list = df.values.tolist()

rdd = sc.parallelize(data_list)

artist_streams_rdd = rdd.map(lambda x: (x[1], int(x[3]))) 

artist_total_streams = artist_streams_rdd.reduceByKey(lambda a, b: a + b)

artist_total_streams.take(10)


[('The Weeknd', 8066),
 ('Harry Styles', 2019),
 ('24kGoldn', 2021),
 ('Lewis Capaldi', 2019),
 ('Ed Sheeran', 10081),
 ('Imagine Dragons', 8058),
 ('Billie Eilish', 6056),
 ('Post Malone', 14123),
 ('Maroon 5', 6054),
 ('The Chainsmokers', 6049)]

## Collect Results

Because of lazy evaluation, the map-reduce operation is performed only now. Show what you calculated.

In [8]:
# TODO

results = artist_total_streams.collect()
for artist, total in results:
    print(f"{artist}: {total}")


The Weeknd: 8066
Harry Styles: 2019
24kGoldn: 2021
Lewis Capaldi: 2019
Ed Sheeran: 10081
Imagine Dragons: 8058
Billie Eilish: 6056
Post Malone: 14123
Maroon 5: 6054
The Chainsmokers: 6049
James Arthur: 2016
Juice WRLD: 2018
John Legend: 2013
Ariana Grande: 4038
XXXTENTACION: 4035
Travis Scott: 4034
Avicii: 2013
DaBaby: 2020
Twenty One Pilots: 4031
SAINt JHN: 2019
Roddy Ricch: 2019
Justin Bieber: 6045
Lady Gaga: 2018
Lil Uzi Vert: 2017
Hozier: 2014
Marshmello: 2018
OneRepublic: 2013
Macklemore & Ryan Lewis: 2012
Shawn Mendes: 6051
Drake: 4034
Dua Lipa: 4037
Queen: 1975
Kendrick Lamar: 2017
DJ Snake: 2016
Sam Smith: 4031
Mark Ronson: 2015
Lauv: 2018
The Killers: 2004
Tones And I: 2019
Camila Cabello: 2018
French Montana: 2017
Vance Joy: 2014
Bruno Mars: 2016
Wiz Khalifa: 2015
Calvin Harris: 2018
5 Seconds of Summer: 2018
Lil Nas X: 2019
Jason Mraz: 2008
Charlie Puth: 4034
Lukas Graham: 2016
Cardi B: 2018
Luis Fonsi: 4038
Oasis: 1995
benny blanco: 2018
OMI: 2015
Sia: 2016
Alan Walker: 201

## Save Results

In [10]:
# TODO
output_path = "ergebnisse/gesamt_streams_pro_artist"
if os.path.exists(output_path):
    shutil.rmtree(output_path)
artist_total_streams.saveAsTextFile(output_path)

25/06/20 20:07:59 WARN FileSystem: Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem Unable to get public no-arg constructor
25/06/20 20:07:59 WARN FileSystem: java.lang.NoClassDefFoundError: com/google/api/client/http/HttpRequestInitializer
25/06/20 20:07:59 WARN FileSystem: java.lang.ClassNotFoundException: com.google.api.client.http.HttpRequestInitializer


# Spark DataFrame

Spark DataFrame is a distributed collection of data organized into named columns, designed for efficient data manipulation and analysis in Apache Spark. It is used for various data processing tasks such as data ingestion, transformation, querying, and analysis in Apache Spark, providing a high-level abstraction that simplifies working with structured data.

## Load Data into DataFrame

In [11]:
# TODO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Spark DataFrame laden
df = spark.read.option("header", True).csv("/home/bdeng-g6/notebooks/spark/Top_100_most_Streamed.csv")


## View DataFrame Schema

In [12]:
# TODO
df.printSchema()


root
 |-- title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- top genre: string (nullable = true)
 |-- year: string (nullable = true)
 |-- beats.per.minute: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- loudness.dB: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valance: string (nullable = true)
 |-- length: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- popularity: string (nullable = true)



## View DataFrame Data

In [13]:
# TODO
df.show()


+--------------------+----------------+--------------------+----+----------------+------+------------+-----------+--------+-------+------+------------+-----------+----------+
|               title|          artist|           top genre|year|beats.per.minute|energy|danceability|loudness.dB|liveness|valance|length|acousticness|speechiness|popularity|
+--------------------+----------------+--------------------+----+----------------+------+------------+-----------+--------+-------+------+------------+-----------+----------+
|     Blinding Lights|      The Weeknd|canadian contempo...|2020|             171|    73|          51|         -6|       9|     33|   200|           0|          6|        91|
|    Watermelon Sugar|    Harry Styles|                 pop|2019|              95|    82|          55|         -4|      34|     56|   174|          12|          5|        88|
|Mood (feat. iann ...|        24kGoldn|            cali rap|2021|              91|    72|          70|         -4|      32|  

## Filter Data

Performe a filter operation on a column

In [14]:
# TODO
from pyspark.sql.functions import col

filtered_df = df.filter(col("popularity") > 90)

filtered_df.show()



+---------------+----------+--------------------+----+----------------+------+------------+-----------+--------+-------+------+------------+-----------+----------+
|          title|    artist|           top genre|year|beats.per.minute|energy|danceability|loudness.dB|liveness|valance|length|acousticness|speechiness|popularity|
+---------------+----------+--------------------+----+----------------+------+------------+-----------+--------+-------+------+------------+-----------+----------+
|Blinding Lights|The Weeknd|canadian contempo...|2020|             171|    73|          51|         -6|       9|     33|   200|           0|          6|        91|
+---------------+----------+--------------------+----+----------------+------+------------+-----------+--------+-------+------+------------+-----------+----------+



## Group By and Aggregate

Performe a group by and aggregat operation

In [15]:
# TODO
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count
import os
import shutil


df = df.withColumn("popularity", col("popularity").cast("int"))


grouped_df = df.groupBy("artist").agg(
    sum("popularity").alias("total_popularity"),
    avg("popularity").alias("avg_popularity"),
    count("*").alias("track_count")
).show()


+--------------------+----------------+-----------------+-----------+
|              artist|total_popularity|   avg_popularity|track_count|
+--------------------+----------------+-----------------+-----------+
|           Lil Nas X|              80|             80.0|          1|
|Macklemore & Ryan...|              82|             82.0|          1|
|          Juice WRLD|              84|             84.0|          1|
|    The Chainsmokers|             245|81.66666666666667|          3|
|           SAINt JHN|              83|             83.0|          1|
|        benny blanco|              76|             76.0|          1|
|        XXXTENTACION|             167|             83.5|          2|
|         The Killers|              81|             81.0|          1|
|               Oasis|              77|             77.0|          1|
|         Tones And I|              80|             80.0|          1|
|           Sam Smith|             161|             80.5|          2|
|           Vance Jo

## Save DataFrame to Parquet

In [17]:
# TODO
df.write.mode("overwrite").parquet("output/spotify_data.parquet")
