## Imports

In [1]:
!pip install --upgrade pip
!pip install sagemaker_pyspark
!pip install pyspark
!pip install gdown

Collecting pip
  Downloading pip-21.1.2-py3-none-any.whl (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 34.8 MB/s eta 0:00:01
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.0.1
    Uninstalling pip-21.0.1:
      Successfully uninstalled pip-21.0.1
Successfully installed pip-21.1.2
Collecting gdown
  Downloading gdown-3.13.0.tar.gz (9.3 kB)
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h    Preparing wheel metadata ... [?25ldone
Collecting tqdm
  Downloading tqdm-4.61.1-py2.py3-none-any.whl (75 kB)
[K     |████████████████████████████████| 75 kB 5.2 MB/s  eta 0:00:01
Building wheels for collected packages: gdown
  Building wheel for gdown (PEP 517) ... [?25ldone
[?25h  Created wheel for gdown: filename=gdown-3.13.0-py3-none-any.whl size=9034 sha256=474c320b03afb67ec9b99b3da0ac3d598fb712449fdac418b37fd0a3086dcf9e
  Stored in directory: /home/ec2-u

# Session setup

## Current user's data retrieval

In [2]:
import botocore.session

session = botocore.session.get_session()
credentials = session.get_credentials()

## S3 setup

In [3]:
# retrieves infos about S3
import boto3

s3 = boto3.client('s3')
bucket = "cristo-test"

## PySpark setup

In [4]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import sagemaker_pyspark

conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(sagemaker_pyspark.classpath_jars())))

spark = (
    SparkSession
    .builder
    .config(conf=conf) \
    .config('fs.s3a.access.key', credentials.access_key)
    .config('fs.s3a.secret.key', credentials.secret_key)
    .appName("recommender-system")
    .getOrCreate()
)

## Utility functions

### S3 functions

These functions are a bridge to S3 using the `boto3` module

In [5]:
from typing import Union, List
import os
import requests

def list_files(client, bucket: str):
    contents = client.list_objects(Bucket=bucket, Prefix="")['Contents']
    filenames = [filename["Key"] for filename in contents]
    return filenames

def get_object(client, bucket: str, filepath: str) -> str:
    response = client.get_object(Bucket=bucket, Key=filepath)
    body = response["Body"].read()
    return body

def load_dataframe(client, bucket: str, filepaths: Union[str, List[str]]):
    # eventually casts a single filepath to a list 
    if isinstance(filepaths, str):
        filepaths = [filepaths]
    # downloads the datasets from S3
    for filepath in filepaths:
        if not os.path.basename(filepath) in os.listdir():
            print(f"Downloading {filepath} to {os.path.basename(filepath)}")
            s3.download_file(bucket, filepath, os.path.basename(filepath))
    df = spark.read.csv([os.path.basename(filepath) for filepath in filepaths], header=True).select("event_time", "user_id", "event_type", "product_id")
    return df

# Data downloading

Since the datasets are hosted on Google Drive, this code:

- downloads the compressed files (`.csv.gz`) to this notebook's space
- decompresses the previously downloaded files (`.csv.gz` $\rightarrow$ `.csv`)
- uploads the files to an S3 bucket

In [6]:
import gdown
import gzip
import shutil
    
datasets_ids = [
    ("Dec.csv.gz", "1qZIwMbMgMmgDC5EoMdJ8aI9lQPsWA3-P"),
    ("Jan.csv.gz", "1x5ohrrZNhWQN4Q-zww0RmXOwctKHH9PT"),
    ("Feb.csv.gz", "1-Rov9fFtGJqb7_ePc6qH-Rhzxn0cIcKB"),
    ("Mar.csv.gz", "1zr_RXpGvOWN2PrWI6itWL8HnRsCpyqz8"),
    ("Apr.csv.gz", "1g5WoIgLe05UMdREbxAjh0bEFgVCjA1UL")
]

for dataset_name, dataset_id in datasets_ids:
    # check if the file is already on S3
    if dataset_name.replace(".gz", "") in list_files(client=s3, bucket=bucket):
        print(f"{dataset_name.replace('.gz', '')} already on S3")
        continue
        
    # downloads the data
    if dataset_name in os.listdir() or dataset_name.replace(".gz", "") in os.listdir():
        print(f"{dataset_name} already downloaded")
    else:
        print(f"Downloading {dataset_name}...")
        gdown.download(f"https://drive.google.com/uc?id={dataset_id}", dataset_name, quiet=False)
        
    # extracts the archives
    if dataset_name.replace(".gz", "") in os.listdir():
        print(f"{dataset_name} already extracted")
    else:
        print(f"Extracting {dataset_name} to {dataset_name.replace('.gz', '')}...")
        with gzip.open(dataset_name, 'rb') as fp_in:
            with open(dataset_name.replace(".gz", ""), 'wb') as fp_out:
                shutil.copyfileobj(fp_in, fp_out)
                
    # uploads to S3
    print(f"Uploading {dataset_name.replace('.gz', '')} to S3...")
    with open(dataset_name.replace('.gz', ''), "rb") as fp:
        s3.upload_fileobj(fp, bucket, dataset_name.replace('.gz', ''))
    print(f"Successfully Uploaded {dataset_name.replace('.gz', '')} to S3")

Dec.csv already on S3
Jan.csv already on S3
Feb.csv already on S3
Mar.csv already on S3
Apr.csv already on S3


In [None]:
import time
import re

starting_time = time.time()

# finds the locations of the .csv files 
csvs = [filename for filename in list_files(client=s3, bucket=bucket) if re.match(r".*.csv", filename)][0]
df = load_dataframe(client=s3, bucket=bucket, filepaths=csvs)

print()
df.printSchema()
print(f"|df| = {df.count()}")
df.show(4)

# print(f"\t...done in {time.time() - starting_time}")


root
 |-- event_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)



# Data preprocessing

We now have the datasets uploaded to S3

This code will do the following:

- downloads the `.csv` datasets from S3 to this notebook's space
- loads the datasets into a PySpark dataframe
- preprocesses the dataframe to:
    - transform the implicit feedbacks to explicit ones 
    - remap ids to small integers, since the recommender system does not accept such values 

In [None]:

from cc_project import datasets

df = datasets.preprocess_dataframe(df=df)
df.printSchema()
df.show(4)


# upload_file_from_url(client=s3, bucket=bucket, url='https://www.facebook.com/favicon.ico', verbose=True)
#     print(csv)
#     df = load_dataframe(client=s3, bucket=bucket, filepath="Dec.csv", preprocess=True)
#     df.show()

In [None]:
# !cd cc_project; bash s3_bucket/train_scheduler.sh