In [1]:
import os
os.environ["SPARK_HOME"] = "/home/osboxes/spark/spark-2.4.0-bin-hadoop2.7/"

import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, levenshtein, to_timestamp, to_date
#from pyspark.sql.functions import col, split, levenshtein, to_timestamp, date_format
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
#conf = pyspark.SparkConf().setAppName('appName').setMaster('spark://192.168.11.128:8080')

sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

#spark.enableHiveSupport()

In [2]:
from pyspark.sql.types import *
def set_Schema(cols):
    simpleSchema = []
    
    for i in cols: 
        simpleSchema.append(StructField(i[0],i[1])) 
        
    return StructType(fields=simpleSchema)

In [3]:
from pyspark.sql.functions import col, split, levenshtein, to_timestamp, to_date
def persistToStagingDB(csv_file, table_name, table_schema):
    
    df = spark.read.csv(csv_file, header=True,encoding="UTF-8", quote='"',escape='"', schema =table_schema) 
    #Other options that can come in handy during csv import are:
    #multiLine=True,ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,encoding="UTF-8",sep=',',maxColumns=2,inferSchema=True
    
    if table_name == 'spotify_top_200_weekly':

        #split date columns into start and end
        df = df.withColumn("start_date", split(col("date_range"), "--").getItem(0)).withColumn("end_date", split(col("date_range"), "--").getItem(1))
        df = df.withColumn("start_date", df["start_date"].cast(DateType()))
        df = df.withColumn("end_date", df["end_date"].cast(DateType()))
   
    elif table_name == 'songkick_uk_events':
        df = df.dropDuplicates(subset = ['event_id', 'mbid'])
        df = df.withColumn("start_date",to_date("start_date","d/M/y"))        
        df = df.withColumn("start_time",to_timestamp("start_time", "H:mm:ss"))
        
    #Persit to Postgres DB - available after session closes
    try:
        df.write \
        .jdbc("jdbc:postgresql://osboxes:5432/gig_stg", table_name,
              properties={"user": "postgres", "password": "postgres"})
        return True
    except:
        return False
        

In [7]:
#activeArtists.csv
#remember to always change the column name to lowercase
# source: https://stackoverflow.com/questions/56145841/pandas-adding-double-quotes-on-column-names
table_name = 'dim_mbartist'
columns = [('gid',StringType()),
           ('name',StringType()), 
           ('type', DoubleType()), 
           ('gender', DoubleType())]
simple_struct = set_Schema(columns)

csv_file = '../datasets/activeArtists.csv'

persistToStagingDB(csv_file, table_name, simple_struct)


True

In [None]:
#remember to always change the column name to lowercase
# source: https://stackoverflow.com/questions/56145841/pandas-adding-double-quotes-on-column-names
table_name = 'spotify_top_200_weekly'
columns = [('week_position',IntegerType()),
           ('track_name',StringType()), 
           ('artist', StringType()), 
           ('streams', FloatType()), 
           ('date_range', StringType()), 
           ('region', StringType()), 
           ('spotify_id', StringType())]
simple_struct = set_Schema(columns)

csv_file = '../datasets/spotify_top_200_weekly.csv'

persistToStagingDB(csv_file, table_name, simple_struct)

In [None]:
#remember to always change the column name to lowercase
# source: https://stackoverflow.com/questions/56145841/pandas-adding-double-quotes-on-column-names

table_name = 'spotify_track_details'
columns = [('track_spotify_id',StringType()),
           ('artist_spotify_id',StringType()), 
           ('artist_mbid', StringType()), 
           ('artist_name', StringType()),
           ('artist_fychart_name', StringType()),            
           ('track_url', StringType()), 
           ('track_popularity', FloatType()), 
           ('track_duration_ms', IntegerType()),
           ('track_is_local', BooleanType()), 
           ('album_id', StringType()), 
           ('album_track_number', IntegerType()), 
           ('album_release_date', StringType()), 
           ('album_type', StringType())
          ]
simple_struct = set_Schema(columns)
csv_file = '../datasets/spotify_track_details.csv'

#df = spark.read.csv(csv_file, header=True,encoding="UTF-8", quote='"',escape='"', schema =simple_struct)

persistToStagingDB(csv_file, table_name, simple_struct)

In [16]:
table_name = 'songkick_uk_venues'
columns = [('venue_id', IntegerType()),
           ('name', StringType()),
           ('street', StringType()),
           ('post_code', StringType()),
           ('city', StringType()),
           ('country', StringType()),
           ('capacity', FloatType()),
           ('website', StringType()),
           ('phone', StringType())
          ]
simple_struct = set_Schema(columns)
csv_file = '../datasets/clean/ukVenues.csv'

persistToStagingDB(csv_file, table_name, simple_struct)

False

In [28]:
#TimestampType
table_name = 'songkick_uk_events'
columns = [('event_id', IntegerType()),
           ('event_name', StringType()),
           ('event_type', StringType()),
           ('uri', StringType()),
           ('age_restriction', StringType()),
           ('mbid', StringType()),
           ('venue_id', DoubleType()),
           ('start_date', StringType()),
           ('start_time', StringType()),
           ('country', StringType()),
           ('flagged_as_ended', BooleanType())           
           #,('popularity', DoubleType())
          ]
simple_struct = set_Schema(columns)
csv_file = '../datasets/ukEvents.csv'

persistToStagingDB(csv_file, table_name, simple_struct)

True

In [None]:
help(pyspark.sql.types)

In [12]:
df = spark.read.csv('../datasets/activeArtists.csv', header=True,encoding="UTF-8", quote='"',escape='"', inferSchema='true') 


In [5]:
df = spark.read.csv('../datasets/ukEvents.csv', header=True,encoding="UTF-8", quote='"',escape='"', inferSchema='true') 

In [6]:
df_unique = df.dropDuplicates(subset = ['eventID', 'artistMBID'])

In [7]:
df_unique.count()

69335

In [13]:
df.count()

1514086

In [17]:
df = df.dropDuplicates(subset = ['gid'])

In [18]:
df.count()

1514086

In [9]:
df.printSchema()

root
 |-- eventID: integer (nullable = true)
 |-- eventName: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventURI: string (nullable = true)
 |-- eventAgeRestriction: string (nullable = true)
 |-- artistMBID: string (nullable = true)
 |-- venueID: double (nullable = true)
 |-- eventStartDate: string (nullable = true)
 |-- eventStartTime: string (nullable = true)
 |-- venueCountry: string (nullable = true)
 |-- eventFlaggedAsEnded: boolean (nullable = true)



In [10]:
df = spark.read.csv('../datasets/ukVenues.csv', header=True,encoding="UTF-8", quote='"',escape='"', inferSchema='true') 

In [9]:
df.printSchema()

root
 |-- venueID: string (nullable = true)
 |-- venueName: string (nullable = true)
 |-- venueStreet: string (nullable = true)
 |-- venuePostcode: string (nullable = true)
 |-- venueCity: string (nullable = true)
 |-- venueCountry: string (nullable = true)
 |-- venueCapacity: string (nullable = true)
 |-- venueWebsite: string (nullable = true)
 |-- venuePhone: string (nullable = true)



In [11]:
df.count()

7031

## Option to add to spark.read.csv to ensure string in quotes and commas are treated as one

In [None]:

#df_weeklyTop200 = spark.read.csv('../datasets/spotify_top_200_weekly.csv', header=True, nullValue='', schema=simple_struct)



In [None]:
df_weeklyTop200.filter(df_weeklyTop200.spotify_id == '5mZXWEH2eh8zMZGCxT5aW0').show(vertical=True, truncate=False)

## Persist dataframe to temporary and permanent storage

In [None]:
#Persist temporarily, not available when session closes
df_weeklyTop200.createOrReplaceTempView('spotify_top_200_weekly')
spark.sql('select count(1) from spotify_top_200_weekly').show()


In [None]:
#Persist to HIVE Metastore - this is available after the session closes using spark.read.parquet('spark-warehouse/weeklytop200/')
df_weeklyTop200.write.mode('Overwrite').saveAsTable('spotify_top_200_weekly')

In [None]:
spark.read.parquet('spark-warehouse/spotify_top_200_weekly/').count()

In [None]:
sc.stop()