# Part I. ETL Pipeline for Pre-Processing the Files

The goal of this project is to develop a data processing pipeline that extracts, transforms, and loads music app streaming data from CSV files into Apache Cassandra.

The pipeline enables querying and analysis of the music app history to answer specific questions about the artists, songs, and user interactions within the app.

The project demonstrates the use of Apache Cassandra as a powerful database solution for handling large-scale data with high availability and performance, providing valuable insights for music app analytics and decision-making.

#### Import Python packages 

In [1]:
from libs.files.readers.csv_reader import CSVReader
from libs.files.collector import FileCollector
import os
from libs.files.readers.csv_reader import CSVReader
from libs.files.writers.csv import CSVWriter
from libs.databases.connectors.cassandra_db import CassandraConnector
from libs.databases.managers.cassandra_db import CassandraTableManager
import pandas as pd

In [2]:
# Path where raw data files are
PATH_RAW_DATA = '/data/raw/event_data'
PATH_PROCESSED_FILE = '/data/processed/event_datafile_new.csv'

#### Creating list of filepaths to process original event csv data files

In [3]:
files = FileCollector(PATH_RAW_DATA)
file_paths = files.collect_files()

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [4]:
reader = CSVReader(file_paths)
reader.apply_filter(lambda row: pd.notnull(row['artist']))
df_data = reader.data

In [5]:
df_data.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,You Gotta Be,200,1541110000000.0,8.0
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Flat 55,200,1541110000000.0,8.0
5,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Quem Quiser Encontrar O Amor,200,1541110000000.0,8.0
6,The Mars Volta,Logged In,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Eriatarka,200,1541110000000.0,8.0
7,Infected Mushroom,Logged In,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Becoming Insane,200,1541110000000.0,8.0


In [6]:
writer = CSVWriter()
columns = [
    'artist',
    'firstName',
    'gender',
    'itemInSession',
    'lastName',
    'length',
    'level',
    'location',
    'sessionId',
    'song',
    'userId'
]

df_data = writer.write_data(file_path=PATH_PROCESSED_FILE, data=df_data, columns=columns)

### Loading Data in Apache Cassandra

#### Creating a Apache Cassandra Cluster

In [7]:
cassandra_conn = CassandraConnector(contact_points=['cassandra'])
cassandra_conn.connect()

**Create and set keyspace**

In [8]:
table_manager = CassandraTableManager(connector=cassandra_conn, keyspace='data_modeling')
table_manager.create_keyspace()
table_manager.set_keyspace()

#### Creating tables

In [9]:
from typing import Dict, List
def create_table(table_name: str, partition_key: List[str], columns: Dict[str, str],
                 df_data: pd.DataFrame, clustering_key: List[str]=[]) -> None:
    """
    Creates a table in Apache Cassandra database, defines its schema, and inserts data into the table.

    Args:
        table_name (str): The name of the table to create.
        partition_key List[str]:
                List of primary key columns
        clustering_key List[str]:
                List of clustering key columns
        columns (Dict[str, str]): A dictionary of column names and their data types.
                                  Key: The column name.
                                  Value: The data type of the column.
        df_data (pd.DataFrame): The DataFrame containing the data to be inserted into the table.

    Returns:
        None.
    """
    # Drop the table if it already exists
    table_manager.drop_table(table_name)

    # Create the table with the specified schema
    table_manager.create_table(table_name=table_name, columns=columns, partition_key=partition_key, clustering_key=clustering_key)

    # Extract the relevant columns from the DataFrame and convert it to a list of dictionaries
    data = df_data[columns.keys()].to_dict(orient='records')

    # Insert the data into the table
    table_manager.insert_data(table_name=table_name, data=data)


In [10]:
table_name = 'song_length'

columns={
        'artist': 'text',
        'song': 'text',
        'length': 'float',
        'sessionId': 'int',
        'itemInSession': 'int'
    }

partition_key= ['sessionId', 'itemInSession']

create_table(
    table_name=table_name,
    partition_key=partition_key,
    columns=columns,
    df_data=df_data
)

In [11]:
table_name = "song_by_user_and_session"

columns = {
    "artist": "text",
    "song": "text",
    "itemInSession": "int",
    "firstName": "text",
    "lastName": "text",
    "userId": "float",
    "sessionId": "int",
}

partition_key = ["userId", "sessionId"]
clustering_key = ["itemInSession"]

create_table(
    table_name=table_name,
    partition_key=partition_key,
    clustering_key=clustering_key,
    columns=columns,
    df_data=df_data
)

In [12]:
table_name = "users_by_song"
columns = {
    "firstName": "text",
    "lastName": "text",
    "song": "text",
}
partition_key = ["song"]
clustering_key = ["firstName", "lastName"]

create_table(
    table_name=table_name,
    partition_key=partition_key,
    clustering_key=clustering_key,
    columns=columns,
    df_data=df_data
)

### Business Insights

#### Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [13]:
query = """
    SELECT artist, song, length, sessionId, itemInSession
    FROM song_length
    WHERE sessionId = 338
      AND itemInSession = 4
"""

df = table_manager.query_to_dataframe(query)
df

Unnamed: 0,artist,song,length,sessionid,iteminsession
0,Faithless,Music Matters (Mark Knight Dub),495.307312,338,4


#### Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [14]:
query = """
    SELECT artist, song, firstName, lastName
    FROM song_by_user_and_session
    WHERE userId = 10 AND sessionId = 182
"""

df = table_manager.query_to_dataframe(query)
df

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


#### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [15]:
query = """
    SELECT firstName, lastName, song
    FROM users_by_song
    WHERE song = 'All Hands Against His Own'
"""
df = table_manager.query_to_dataframe(query)
df

Unnamed: 0,firstname,lastname,song
0,Jacqueline,Lynch,All Hands Against His Own
1,Sara,Johnson,All Hands Against His Own
2,Tegan,Levine,All Hands Against His Own


### Drop the tables before closing out the sessions

In [16]:
table_manager.drop_table('song_length')
table_manager.drop_table('song_by_user_and_session')
table_manager.drop_table('users_by_song')

In [17]:
cassandra_conn.disconnect()