In [11]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("SpotifyGenreUpdate")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

#loading data locally --> upload data to jupyther lab
df_local = spark.read.format("csv").option("header", "true").option('delimiter',',') \
       .load("/home/jovyan/data/genre_repeated.csv")
df_local.printSchema()

df_local.show()  

#transformation
df_local = df_local.distinct()
df_local.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+---+------------+
| id|        name|
+---+------------+
|  1|      'Rock'|
|  2|       'Pop'|
|  3|   'Hip-hop'|
|  4|       'Rap'|
|  5|       'R&B'|
|  6| 'Classical'|
|  7|    'Techno'|
|  8|      'Jazz'|
|  9|      'Folk'|
| 10|   'Country'|
| 11|     'Metal'|
| 12|     'House'|
|  1|      'Rock'|
|  1|      'Rock'|
|  8|      'Jazz'|
|  8|      'Jazz'|
|  8|      'Jazz'|
| 11|     'Metal'|
| 11|     'Metal'|
| 11|     'Metal'|
+---+------------+
only showing top 20 rows

+---+------------+
| id|        name|
+---+------------+
|  8|      'Jazz'|
|  5|       'R&B'|
| 11|     'Metal'|
|  1|      'Rock'|
| 13|     'Polka'|
|  3|   'Hip-hop'|
|  2|       'Pop'|
|  6| 'Classical'|
|  7|    'Techno'|
|  9|      'Folk'|
| 10|   'Country'|
| 12|     'House'|
|  4|       'Rap'|
+---+------------+



In [12]:
# Load data from BigQuery.
df_bigquery = spark.read \
  .format("bigquery") \
  .load("de-2022-366209.spotify.genre")    # project_id.datatset.tablename. Use your project id
df_bigquery.printSchema()
df_bigquery.show(4)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+---+----+
| id|name|
+---+----+
+---+----+



In [13]:
# Select missing genre in Big Query
join_ = df_local['id'] == df_bigquery['id']  
df_miss = df_local.join(df_bigquery, join_, 'left_anti')
df_miss.show(4)
df_miss.printSchema()

+---+--------+
| id|    name|
+---+--------+
|  8|  'Jazz'|
|  5|   'R&B'|
| 11| 'Metal'|
|  1|  'Rock'|
+---+--------+
only showing top 4 rows

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [14]:
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "de_temp_assignment2"  # own bucket 
spark.conf.set('temporaryGcsBucket', bucket)
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Saving the data to BigQuery
df_miss.write.format('bigquery') \
  .option('table', 'de-2022-366209.spotify.genre') \
  .mode("append") \
  .save()


In [15]:
spark.stop()

In [16]:
#Supported join types include: 'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'semi', 'leftanti', 'left_anti', 'anti', 'cross'.