# Imports

In [1]:
import os
from pyspark import SparkContext
import findspark

### Configuring credentials

In [2]:
import configparser
config = configparser.ConfigParser()
config.read('/Users/paulogier/81-GithubPackages/Udacity-Data-Engineer-NanoDegree/P4-Data_Lake_with_Spark/p4src/etl/dbuser_config.cfg')
os.environ['AWS_KEY_ID'] = config.get("AWS", "KEY")
os.environ['AWS_SECRET'] = config.get("AWS", 'SECRET')

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('S3CSVRead').getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

In [4]:
spark._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", os.environ.get('AWS_KEY_ID'))
spark._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", os.environ.get('AWS_SECRET'))


### Reading log (event) data

In [5]:
from pyspark.sql import types as T
s = T.StructType([
    T.StructField("artist", T.StringType()),
    T.StructField("auth", T.StringType()),
    T.StructField("firstName", T.StringType()),
    T.StructField("gender", T.StringType()),
    T.StructField("itemInSession", T.IntegerType()),
    T.StructField("lastName", T.IntegerType()),
    T.StructField("length", T.DoubleType()),
    T.StructField("level", T.StringType()),
    T.StructField("location", T.StringType()),
    T.StructField("method", T.StringType()),
    T.StructField("page", T.StringType()),
    T.StructField("registration", T.IntegerType()),
    T.StructField("sessionId", T.IntegerType()),
    T.StructField("song", T.StringType()),
    T.StructField("status", T.IntegerType()),
    T.StructField("ts", T.TimestampType()),
    T.StructField("userAgent", T.StringType()),
    T.StructField("userId", T.IntegerType())
])

In [6]:
mybucket= 'udacity-dend'
myprefix = 'log_data/'
mypath = "s3a://"+mybucket+'/' + myprefix + '*/*/*.json'
print(mypath)
df = spark.read.json(mypath, schema=s)
print(df.count())

s3a://udacity-dend/log_data/*/*/*.json
8056


In [7]:
s2 = T.StructType([
        T.StructField("artist_id", T.StringType()),
        T.StructField("artist_latitude", T.DoubleType()),
        T.StructField("artist_location", T.StringType()),
        T.StructField("artist_longitude", T.DoubleType()),
        T.StructField("artist_name", T.StringType()),
        T.StructField("duration", T.DoubleType()),
        T.StructField("num_songs", T.IntegerType()),
        T.StructField("title", T.StringType()),
        T.StructField("year", T.IntegerType()),
    ])


In [16]:
import boto3
s3c = boto3.client("s3", 
                  region_name='us-west-2', 
                  aws_access_key_id=os.environ.get('AWS_KEY_ID'), 
                  aws_secret_access_key=os.environ.get('AWS_SECRET')
                 )
m = []
for key in s3c.list_objects(Bucket='udacity-dend', Prefix='song_data/A/A/')['Contents']:
    k = key['Key']
    m.append(k)
print(len(m))

604


In [17]:
mypath2= "s3a://udacity-dend/song_data/A/A/*/*.json"
df2 = spark.read.json(mypath2, schema=s2)
print(df2.count())

604
