# Environment Setup

## Import packages and modules.

In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

## Read configuration file and get AWS credentials.

In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

# Spark Cluster Setup (IN TERMINAL)

In [None]:
## Create an EMR Cluster with 3 instances

In [None]:
client = boto3.client('emr', region_name='us-east-1')

response = client.run_job_flow(
    Name="Boto3 test cluster",
    ReleaseLabel='emr-6.3.0',
    Instances={
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
        'InstanceGroups': [
            {
                'Name': 'Master',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm4.large',
                'InstanceCount': 1,
                'EbsConfiguration': {
                    'EbsBlockDeviceConfigs': [
                        {
                            'VolumeSpecification': {
                                'VolumeType': 'gp2',
                                'SizeInGB': 10
                            },
                            'VolumesPerInstance': 1
                        },
                    ],
                    'EbsOptimized': False
                }
            },
            {
                'Name': 'Core',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm4.large',
                'InstanceCount': 1,
                'EbsConfiguration': {
                    'EbsBlockDeviceConfigs': [
                        {
                            'VolumeSpecification': {
                                'VolumeType': 'gp2',
                                'SizeInGB': 10
                            },
                            'VolumesPerInstance': 1
                        },
                    ],
                    'EbsOptimized': False
                }
            },
            {
                'Name': 'Task',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'TASK',
                'InstanceType': 'm4.large',
                'InstanceCount': 1,
                'EbsConfiguration': {
                    'EbsBlockDeviceConfigs': [
                        {
                            'VolumeSpecification': {
                                'VolumeType': 'gp2',
                                'SizeInGB': 10
                            },
                            'VolumesPerInstance': 1
                        },
                    ],
                    'EbsOptimized': False
                }
            },
        ],
        'Ec2KeyName':'EMR Key Pair'
    },
    VisibleToAllUsers=True,
    ServiceRole='EMR_DefaultRole',
    JobFlowRole='EMR_EC2_DefaultRole',
    AutoScalingRole="EMR_AutoScaling_DefaultRole"
)

print(json.dumps(response, indent=4, sort_keys=True, default=str))

# ETL Processing

## Create Spark Session

In [None]:
def create_spark_session():
    '''Create SparkSession object to read/write data with Spark.'''
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

## Unzip data files

In [1]:
import zipfile
with zipfile.ZipFile('data/log-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data/')
    
with zipfile.ZipFile('data/song-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data/')

## Processing a SINGLE song_data file

In [None]:
song_path = "data/..."
user_log = spark.read.json(song_path)


## Processing a SINGLE log_data file

In [None]:
log_path = "data/..."
user_log = spark.read.json(log_path)
