# Import Data

In [0]:
# connect so s3 bucket
# get credentials
import os

ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
SECRET_KEY = os.getenv("AWS_SECRET_KEY")
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "aida-project"
MOUNT_NAME = "data"

# mount data
try:
  dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
except:
  display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

path,name,size
dbfs:/mnt/data/,data/,0
dbfs:/mnt/data/TSV/,TSV/,0
dbfs:/mnt/data/niy/,niy/,0
dbfs:/mnt/data/team-remote/,team-remote/,0
dbfs:/mnt/data/team_remote/,team_remote/,0


In [0]:
display(dbutils.fs.ls("/mnt/%s/TSV" % MOUNT_NAME))

path,name,size
dbfs:/mnt/data/TSV/name.basics.tsv,name.basics.tsv,579976550
dbfs:/mnt/data/TSV/title.akas.tsv,title.akas.tsv,969441812
dbfs:/mnt/data/TSV/title.basics.tsv,title.basics.tsv,537519832
dbfs:/mnt/data/TSV/title.principals.tsv,title.principals.tsv,1622240736
dbfs:/mnt/data/TSV/title.ratings.tsv,title.ratings.tsv,16907124


In [0]:
df_names = spark.read.load("dbfs:/mnt/data/TSV/name.basics.tsv",
                           format="csv", sep="\t", inferSchema="true", header="true")
df_akas = spark.read.load("dbfs:/mnt/data/TSV/title.akas.tsv",
                           format="csv", sep="\t", inferSchema="true", header="true")
df_basics = spark.read.load("dbfs:/mnt/data/TSV/title.basics.tsv",
                           format="csv", sep="\t", inferSchema="true", header="true")
df_principals = spark.read.load("dbfs:/mnt/data/TSV/title.principals.tsv",
                           format="csv", sep="\t", inferSchema="true", header="true")
df_ratings = spark.read.load("dbfs:/mnt/data/TSV/title.ratings.tsv",
                           format="csv", sep="\t", inferSchema="true", header="true")

In [0]:
list_dfs = [df_names, df_akas, df_basics, df_principals, df_ratings]

for df in list_dfs:
  df.printSchema()

# Feature Selection & Data Cleaning

In [0]:
from pyspark.sql.functions import mean as _mean, \
                                  min as _min, \
                                  max as _max, \
                                  count as _count, \
                                  stddev as _stddev, \
                                  countDistinct, col, isnan, split, array_distinct, explode

## Sampling

In [0]:
# until final model evaluation use sample

random_state = 42
sample_size = 1.0

train_size = 0.8
test_size = 1 - train_size

def sample(dataframe):
  df_ids = dataframe.select('tconst', 'averageRating').sample(sample_size, random_state)
  return df_ids

## Table: Ratings

In [0]:
def votes(dataframe):
  df_ids = dataframe.join(df_ratings, ['tconst', 'averageRating'])
  return df_ids

## Table: Principals

In [0]:
def principals(dataframe):
  df_a = dataframe.join(df_principals.drop('job','characters'), on='tconst')\
      .groupBy('tconst', 'averageRating', 'numVotes')\
      .agg(_max('ordering').alias('principal_counts'), countDistinct('category').alias('distinct_count_categories'))
  
  df_b = dataframe.join(df_principals, on='tconst').select('tconst', 'category').groupBy('tconst').pivot('category').count().fillna(0)
  
  df_ids = df_a.join(df_b, on='tconst')
  
  return df_ids

## Table: AKAS

In [0]:
from pyspark.sql.functions import split, explode, array_distinct
#df_akas_sample = df_akas.sample(False, 0.000001, 42)
df_akas_sample = df_akas

def akas(dataframe):
  droplist = ['title', 'language', 'types', 'attributes', 'isOriginalTitle']
  df_akas_feature = df_akas_sample.drop(*droplist)
  column = 'region'
  df_akas_feature = df_akas_feature.where(fr'{column} != "\\N"')
  df_akas_pivot = df_akas_feature.groupBy('titleId').pivot('region').count().fillna(0)
  df_akas_pivot = df_akas_pivot.withColumnRenamed('titleId', 'tconst')
  df_ids = dataframe.join(df_akas_pivot, on='tconst')
  return df_ids

## Table: Basics

In [0]:
def basics(dataframe):
  
  droplist = ['primaryTitle', 'originalTitle', 'endYear']
  df_basics_sample_fs = df_basics.drop(*droplist)
  
  df_basics_sample_pivot = df_basics_sample_fs.select('tconst', 'genres', explode(array_distinct(split('genres', ',')))).groupBy('tconst').pivot('col').count().fillna(0)
  df_basics_sample_pivot = df_basics_sample_pivot.drop(r'\N')
                                                                              
  df_ids = dataframe.join(df_basics_sample_pivot, on='tconst')\
     
  return df_ids

# Train-Test Split

In [0]:
df_ids = sample(df_ratings)
df_ids = votes(df_ids)
df_ids = principals(df_ids)
df_ids = akas(df_ids)
df_ids = basics(df_ids)
df_ids.printSchema()

In [0]:
df_train, df_test = df_ids.randomSplit([train_size, test_size], random_state)

In [0]:
df_train.head()

In [0]:
df_train.drop('tconst') \
  .repartition(1) \
  .write.option("header", "false") \
  .save(f'/mnt/data/niy/{label}', format='csv')

# Write to S3

In [0]:
def save_to_s3(dataframe, label):
    dataframe.drop('tconst') \
      .repartition(1) \
      .write.option("header", "false") \
      .save(f'/mnt/data/niy/{label}', format='csv')
    return "Successfully saved to s3"

In [0]:
def partition_to_s3(dataframe, label):
  dataframe.drop('tconst') \
    .write.option("header", "false") \
    .save(f'/mnt/data/niy/{label}', format='csv')
  return "Successfully saved to s3"

In [0]:
def copyMerge (src_dir, dst_file, overwrite=False, deleteSource=False, debug=False):
    
    # this function has been migrated to https://github.com/Tagar/abalon Python package
    
    hadoop = sc._jvm.org.apache.hadoop
    conf = hadoop.conf.Configuration()
    fs = hadoop.fs.FileSystem.get(conf)

    # check files that will be merged
    files = []
    for f in fs.listStatus(hadoop.fs.Path(src_dir)):
        if f.isFile():
            files.append(f.getPath())
    if not files:
        raise ValueError("Source directory {} is empty".format(src_dir))
    files.sort(key=lambda f: str(f))

    # dst_permission = hadoop.fs.permission.FsPermission.valueOf(permission)      # , permission='-rw-r-----'
    out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite)

    try:
        # loop over files in alphabetical order and append them one by one to the target file
        for file in files:
            if debug: 
                print("Appending file {} into {}".format(file, dst_file))

            in_stream = fs.open(file)   # InputStream object
            try:
                hadoop.io.IOUtils.copyBytes(in_stream, out_stream, conf, False)     # False means don't close out_stream
            finally:
                in_stream.close()
    finally:
        out_stream.close()

    if deleteSource:
        fs.delete(hadoop.fs.Path(src_dir), True)    # True=recursive
        if debug:
            print("Source directory {} removed.".format(src_dir))

In [0]:
if sample_size < 1:
  ############### WRITE TRAINING DATA #################
  try:
    save_to_s3(df_train, 'train')
  except:
    dbutils.fs.rm('mnt/data/niy/train', True)
    save_to_s3(df_train, 'train')

  file = dbutils.fs.ls('mnt/data/niy/train')[-1].path
  dbutils.fs.cp(file, '/mnt/data/niy/train.csv')
  dbutils.fs.rm(file)
  print('Successfully saved training data')
  
  ############### WRITE TESTING DATA #################
  try:
    save_to_s3(df_test, 'test')
  except:
    dbutils.fs.rm('mnt/data/niy/test', True)
    save_to_s3(df_test, 'test')

  file = dbutils.fs.ls('mnt/data/niy/test')[-1].path
  dbutils.fs.cp(file, '/mnt/data/niy/test.csv')
  dbutils.fs.rm(file)
  print('Successfully saved testing data')

else:
  try:
    dbutils.fs.rm('mnt/data/niy/merged', True)
    print('Successfully deleted data')
  except:
    print('Data space does not exist yet')
  
  ############### WRITE TRAINING DATA #################  
  try:
    partition_to_s3(df_train, 'partitioned_train')
  except:
    dbutils.fs.rm('mnt/data/niy/partitioned_train', True)
    partition_to_s3(df_train, 'partitioned_train')
    
  # delete the empty files  
  for file in dbutils.fs.ls('mnt/data/niy/partitioned_train')[:3]:
    dbutils.fs.rm(file.path)
    print(f'Successfully deleted {file.path}')
  
  copyMerge('/mnt/data/niy/partitioned_train', '/mnt/data/niy/merged/train.csv', debug=True, overwrite=True, deleteSource=False)
  
  ############### WRITE TESTING DATA #################
  try:
    partition_to_s3(df_test, 'partitioned_test')
  except:
    dbutils.fs.rm('mnt/data/niy/partitioned_test', True)
    partition_to_s3(df_test, 'partitioned_train')
    
  # delete the empty files  
  for file in dbutils.fs.ls('mnt/data/niy/partitioned_test')[:3]:
    dbutils.fs.rm(file.path)
    print(f'Successfully deleted {file.path}')
  
  copyMerge('/mnt/data/niy/partitioned_test', '/mnt/data/niy/merged/test.csv', debug=True, overwrite=True, deleteSource=False)  