In [317]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, first, asc, desc, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType,StructField
from pyspark.sql.types import StringType, IntegerType, LongType, DoubleType
from pyspark.sql.types import TimestampType, DateType

In [74]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [75]:
def unzip(pathFile, dest):
    from zipfile import ZipFile
    with ZipFile(pathFile, 'r') as zipRef:
        zipRef.extractall(dest)

In [76]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [231]:
from pyspark.sql.types import StructType,StructField
from pyspark.sql.types import StringType, IntegerType, LongType, DoubleType, TimestampType
    
songSchema = StructType([
    StructField('artist_id', StringType()),
    StructField('artist_latitude', DoubleType()),
    StructField('artist_location', StringType()),
    StructField('artist_longitude', DoubleType()),
    StructField('artist_name', StringType()),
    StructField('duration', DoubleType()),
    StructField('num_songs', IntegerType()),
    StructField('song_id', StringType()),
    StructField('title', StringType()),
    StructField('year', IntegerType())
])

In [19]:
unzip('data/song-data.zip', 'data')

In [104]:
song_data = 'data/song_data/*/*/*/*.json'
dfSong = spark.read.json(song_data, schema=songSchema) #     .option('mergeSchema', 'true') \

dfSong.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [105]:
dfSong.count()

71

In [59]:
songTable = df.dropna(how='any', subset=['artist_id', 'song_id']) \
            .select(['song_id', 'title', 'artist_id', 'year', 'duration'])\
            .dropDuplicates()

In [277]:
songTable.write.mode('overwrite') \
        .partitionBy('year', 'artist_id') \
        .parquet('songs.parquet')

In [65]:
artistTable = df.dropna(how='any', subset=['artist_id']) \
            .select(col('artist_id'),
                    col('artist_name').alias('name'),
                    col('artist_location').alias('location'),
                    col('artist_latitude').alias('lattitude'),
                    col('artist_longitude').alias('longitude'))\
            .dropDuplicates()

In [285]:
artistTable.write.mode('overwrite') \
        .parquet('artists.parquet')

In [68]:
artistTable.limit(10).toPandas()

Unnamed: 0,artist_id,name,location,lattitude,longitude
0,ARPFHN61187FB575F6,Lupe Fiasco,"Chicago, IL",41.88415,-87.63241
1,AR3JMC51187B9AE49D,Backstreet Boys,"Orlando, FL",28.53823,-81.37739
2,ARXR32B1187FB57099,Gob,,,
3,AROUOZZ1187B9ABE51,Willie Bobo,"New York, NY [Spanish Harlem]",40.79195,-73.94512
4,AROGWRA122988FEE45,Christos Dantis,,,
5,ARAJPHH1187FB5566A,The Shangri-Las,"Queens, NY",40.7038,-73.83168
6,AREDL271187FB40F44,Soul Mekanik,,,
7,ARBGXIG122988F409D,Steel Rain,California - SF,37.77916,-122.42005
8,ARGSAFR1269FB35070,Blingtones,,,
9,ARD842G1187B997376,Blue Rodeo,"Toronto, Ontario, Canada",43.64856,-79.38533


In [70]:
unzip('data/log-data.zip', 'data/log_data')

In [140]:
logSchema = StructType([
    StructField('artist', StringType()),
    StructField('auth', StringType()),
    StructField('firstName', StringType()),
    StructField('gender', StringType()),
    StructField('itemInSession', LongType()),
    StructField('lastName', StringType()),
    StructField('length', DoubleType()),
    StructField('level', StringType()),
    StructField('location', StringType()),
    StructField('method', StringType()),
    StructField('page', StringType()),
    StructField('registration', DoubleType()),
    StructField('sessionId', LongType()),
    StructField('song', StringType()),
    StructField('status', IntegerType()),
    StructField('ts', LongType()),
    StructField('userAgent', StringType()),
    StructField('userId', StringType())
])

In [172]:
log_data = 'data/log_data/*.json'
df = spark.read.json(log_data, schema=logSchema)

In [173]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [177]:
df = df.where(df.page == 'NextSong')
df = df.withColumn('userId', col('userId').cast(IntegerType()))

In [212]:
userTable = df.select(col('userId').alias('user_id'), 
                      col('firstName').alias('first_name'), 
                      col('lastName').alias('last_name'),
                      col('gender'),
                      col('level')) \
                .dropDuplicates()

In [219]:
userTable = df.groupBy(col('userId').alias('user_id')) \
                .agg(first('firstName').alias('first_name'),
                     first('lastName').alias('last_name'),
                     first('gender').alias('gender'),
                     first('level').alias('level'))

In [220]:
userTable.groupby(userTable.user_id).count().orderBy(desc('count')).show()
userTable.where(userTable.user_id == 88).show()
#userTable.count()

+-------+-----+
|user_id|count|
+-------+-----+
|     85|    1|
|     65|    1|
|     53|    1|
|     78|    1|
|     34|    1|
|    101|    1|
|     81|    1|
|     28|    1|
|     76|    1|
|     26|    1|
|     27|    1|
|     44|    1|
|     12|    1|
|     91|    1|
|     22|    1|
|     47|    1|
|     52|    1|
|     13|    1|
|     16|    1|
|     86|    1|
+-------+-----+
only showing top 20 rows

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     88|  Mohammad|Rodriguez|     M| free|
+-------+----------+---------+------+-----+



In [260]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())

In [262]:
df = df.withColumn('timestamp', get_timestamp('ts'))

In [266]:
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000.0), DateType())

In [267]:
df = df.withColumn('datetime', get_datetime(df.ts))

In [268]:
df.select('ts', 'timestamp', 'datetime', 'userId', 'artist', 'song').limit(20).toPandas()

Unnamed: 0,ts,timestamp,datetime,userId,artist,song
0,1542241826796,2018-11-15 00:30:26.796,2018-11-15,26,Harmonia,Sehr kosmisch
1,1542242481796,2018-11-15 00:41:21.796,2018-11-15,26,The Prodigy,The Big Gundown
2,1542242741796,2018-11-15 00:45:41.796,2018-11-15,26,Train,Marry Me
3,1542253449796,2018-11-15 03:44:09.796,2018-11-15,61,Sony Wonder,Blackbird
4,1542260935796,2018-11-15 05:48:55.796,2018-11-15,80,Van Halen,Best Of Both Worlds (Remastered Album Version)
5,1542261224796,2018-11-15 05:53:44.796,2018-11-15,80,Magic Sam,Call Me If You Need Me
6,1542261356796,2018-11-15 05:55:56.796,2018-11-15,80,Edward Sharpe & The Magnetic Zeros,Home
7,1542261662796,2018-11-15 06:01:02.796,2018-11-15,80,Usher featuring will.i.am,OMG
8,1542262057796,2018-11-15 06:07:37.796,2018-11-15,80,Helen Reddy,Candle On The Water
9,1542262233796,2018-11-15 06:10:33.796,2018-11-15,80,Taylor Swift,Our Song


In [265]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [273]:
timeDf = df.select(date_format('timestamp', 'yyyy-MM-dd HH:mm:ss')\
                       .alias('start_time')\
                       .cast(TimestampType()),
                    hour('timestamp').alias('hour'),
                    dayofmonth('timestamp').alias('day'),
                    weekofyear('timestamp').alias('week'),
                    month('timestamp').alias('month'),
                    year('timestamp').alias('year'))\
                   .dropDuplicates()

# year, month,

In [328]:
song_df = spark.read.parquet('songs.parquet')
artist_df = spark.read.parquet('artists.parquet')
joined_df = song_df.join(artist_df,
                             song_df.artist_id == artist_df.artist_id)\
                        .drop(song_df.artist_id)\
                        .select('song_id', 'title', 'name', 'duration', 'artist_id')

In [312]:
joined_df.show()

+------------------+--------------------+--------------------+---------+------------------+
|           song_id|               title|                name| duration|         artist_id|
+------------------+--------------------+--------------------+---------+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|            Tiny Tim| 43.36281|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|          Tim Wilson|186.48771|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|         King Curtis|326.00771|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|Jeff And Sheri Ea...| 267.7024|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|         Lupe Fiasco|279.97995|ARPFHN61187FB575F6|
|SOUDSGM12AC9618304|Insatiable (Instr...|                 Clp|266.39628|ARNTLGG11E2835DDB9|
|SOPEGZN12AB0181B3D|Get Your Head Stu...|        Soul Mekanik| 45.66159|AREDL271187FB40F44|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|Montserrat Caball...|511.16363|ARDR4AC1

In [340]:
songplay_df = df.join(joined_df, (df.song == joined_df.title) & 
                                 (df.artist == joined_df.name) &
                                 (df.length == joined_df.duration),
                     how = 'left')
songplay_df = songplay_df.withColumn('songplay_id', monotonically_increasing_id())

In [333]:
songplay_df = songplay_df.select(col('songplay_id'),
                                date_format('timestamp', 'yyyy-MM-dd HH:mm:ss')\
                                   .alias('start_time')\
                                   .cast(TimestampType()),
                                col('userId').alias('user_id'),
                                col('level'),
                                col('song_id'),
                                col('artist_id'),
                                col('sessionId').alias('session_id'),
                                col('location'),
                                col('userAgent').alias('user_agent'))

In [342]:
#songplay_df.where(songplay_df.song_id.isNotNull()).show()
songplay_df.where(songplay_df.artist == 'Elena').select('songplay_id', 'song', 'artist', 'duration').show()

+-------------+--------------+------+---------+
|  songplay_id|          song|artist| duration|
+-------------+--------------+------+---------+
|1589137899536|Setanta matins| Elena|269.58322|
+-------------+--------------+------+---------+



In [338]:
artist_df.where(artist_df.artist_id == 'AR5KOSW1187FB35FF4').show()

+------------------+-----+---------+---------+---------+
|         artist_id| name| location|lattitude|longitude|
+------------------+-----+---------+---------+---------+
|AR5KOSW1187FB35FF4|Elena|Dubai UAE| 49.80388| 15.47491|
+------------------+-----+---------+---------+---------+



In [None]:
# Import SparkSession
from pyspark.sql import SparkSession

In [None]:
DB_HOST = 'challenge.prod.public.data.escale.com.br'
DB_NAME = 'challenge'
DB_PORT = '5432'
DB_USER = 'challenge'
DB_PASS = 'Escale@2020'
SPARK_APP = 'Case Escale'

In [None]:
spark = SparkSession \
        .builder \
        .appName(SPARK_APP) \
        .config("spark.jars", "/path_to_postgresDriver/postgresql-postgresql-42.2.19.jre7.jar") \
        .getOrCreate()

In [None]:
def tableReader(tableName):
    global DB_HOST
    global DB_NAME
    global DB_PORT
    global DB_USER
    global DB_PASS
    
    tempDf = spark.read \
            .format("jdbc") \
            .option("url", f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}") \
            .option("dbtable", tableName) \
            .option("user", DB_USER) \
            .option("password", DB_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load()
    return tempDf

In [None]:
telephonyTypesDf = tableReader('telephony_types')
telephonyTypesDf.printSchema()

In [None]:
#Count
telephonyTypesDf.where(telephonyTypesDf.token.isNotNull()).groupby('token').count()

In [None]:
attendancesDf = tableReader('attendances')
attendancesDf.printSchema()