In [1]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, to_date

### Task 2: Importing and Managing Data in Amazon S3 (Marks: 5/25)

#### 2.2) List the files present in your Amazon S3 bucket. Provide screenshots of the command execution and the output.

### Task 3: Data Processing with Apache Spark via Amazon EMR (Marks: 15/25)

In [None]:
spark = SparkSession.builder.appName('a01-big-data-analytics').getOrCreate()

In [None]:
spark

#### 3.1 Spark DataFrame Approach

In [None]:
df1 = spark.read.csv('./data/voice_sample.csv', header=True, inferSchema=True)
df1.printSchema()

In [None]:
df1.count()

In [None]:
df1.show(5)

In [None]:
df1 = df1.withColumn('CALL_DATE', to_date('CALL_TIME', 'yyyyMMddHHmmss'))
df1.printSchema()

In [None]:
df1.show(5)

In [None]:
df1.select('CALL_DATE').distinct().sort('CALL_DATE').show()

In [None]:
TOTAL_NO_OF_DISTINCT_CALL_DATE = df1.select('CALL_DATE').distinct().count()
TOTAL_NO_OF_DISTINCT_CALL_DATE

In [None]:
df1.agg(countDistinct('CALL_DATE')).show()

In [None]:
df2 = spark.read.csv('./data/cell_centers.csv', header=True, inferSchema=True)
df2.printSchema()

In [None]:
df2.count()

In [None]:
df2.show(5)

In [None]:
df2.select('PROVINCE_NAME').distinct().show()

##### 3.1.1) Using an Amazon EMR notebook and Spark DataFrame API, extract the unique CALLER_IDs of users who have made at least one call every day. The calls must have been made from the Western Province. Include the Spark commands used and the output.

In [None]:
df3 = (
    df2
    .filter(df2['PROVINCE_NAME'] == 'Western')
    .join(df1, ['LOCATION_ID'])
    .groupBy('CALLER_ID').agg(countDistinct('CALL_DATE').alias('NO_OF_DISTINCT_CALL_DATE'))
    .filter(col('NO_OF_DISTINCT_CALL_DATE') == df1.select('CALL_DATE').distinct().count())
    .select('CALLER_ID')
)

In [None]:
df3.count()

In [None]:
df3.show(5)

#### 3.2 Spark SQL Approach

In [None]:
df1 = spark.read.csv('./data/voice_sample.csv', header=True, inferSchema=True)
df1.printSchema()

In [None]:
df2 = spark.read.csv('./data/cell_centers.csv', header=True, inferSchema=True)
df2.printSchema()

In [None]:
df1.createOrReplaceTempView("voice_sample")
df2.createOrReplaceTempView("cell_centers")

In [None]:
df1 = spark.sql('SELECT *, TO_DATE(CALL_TIME, "yyyyMMddHHmmss") AS CALL_DATE FROM voice_sample')
df1.show(5)

In [None]:
df1.createOrReplaceTempView("voice_sample")

##### 3.2.1) Repeat the same task as in 3.1.1 but use Spark SQL for data extraction. Include the Spark SQL commands used and the output.

In [None]:
df3 = spark.sql(
    '''
    SELECT CALLER_ID
    FROM (
        SELECT CALLER_ID, COUNT(DISTINCT CALL_DATE) AS NO_OF_DISTINCT_CALL_DATE
        FROM (
            SELECT LOCATION_ID
            FROM cell_centers
            WHERE PROVINCE_NAME = 'Western'
        )
        LEFT JOIN voice_sample
        USING (LOCATION_ID)
        GROUP BY CALLER_ID
    )
    WHERE NO_OF_DISTINCT_CALL_DATE = (SELECT COUNT(DISTINCT CALL_DATE) FROM voice_sample)
    '''
)

In [None]:
df3.count()

In [None]:
df3.show(5)

In [None]:
spark.stop()