# Comics Rx
## [A comic book recommendation system](https://github.com/MangrobanGit/comics_rx)
<img src="https://images.unsplash.com/photo-1514329926535-7f6dbfbfb114?ixlib=rb-1.2.1&ixid=eyJhcHBfaWQiOjEyMDd9&auto=format&fit=crop&w=2850&q=80" width="400" align='left'>

---

# ALS Model - Reduced Data - EDA, Prep

This time, as explored in the EDA NB, let's consider removing customers who we feel have too few or too many purchases to influence the model in the intended way.

Examples:
- Too few - Customers who have only bought 1 comic (series).
- Too many - Customers with > 1000 series (for example, think all eBay customers are rolled into one account number).

# Libraries

In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2  # 1 would be where you need to specify the files
#%aimport data_fcns

import pandas as pd  # dataframes
import os
import time
import numpy as np

# Data storage
from sqlalchemy import create_engine  # SQL helper
import psycopg2 as psql  #PostgreSQL DBs

# import necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.sql.types import (StructType, StructField, IntegerType
#                                ,FloatType, LongType, StringType)
from pyspark.sql.types import *

import pyspark.sql.functions as F
from pyspark.sql.functions import col, explode, lit, isnan, when, count
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import (CrossValidator, ParamGridBuilder, 
                               TrainValidationSplit)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

ModuleNotFoundError: No module named 'psycopg2'

In [None]:
import sys

In [None]:
sys.path.append('..')

In [None]:
# Custom
import data_fcns as dfc
import keys  # Custom keys lib
import comic_recs as cr

In [None]:
# # instantiate SparkSession object
# spark = pyspark.sql.SparkSession.builder.master("local[*]").getOrCreate()
# # spark = SparkSession.builder.master("local").getOrCreate()

In [None]:
from pyspark import SparkConf

conf = SparkConf()

conf = (conf.setMaster('local[*]')
#         .set('spark.executor.memory', '1G') #https://stackoverflow.com/questions/48523629/spark-pyspark-an-error-occurred-while-trying-to-connect-to-the-java-server-127
        .set('spark.driver.memory', '7G')
        .set('spark.driver.maxResultSize', '4G'))
#         .set('spark.executor.memory', '1G')
#         .set('spark.driver.memory', '10G')
#         .set('spark.driver.maxResultSize', '5G'))

sc = pyspark.SparkContext().getOrCreate(conf=conf)

from pyspark.sql import SQLContext
sql_context = SQLContext(sc)

sc.setCheckpointDir('./checkpoints')

# spark.sparkContext.setCheckpointDir("hdfs://datalake/check_point_directory/als")

## Import the data

There is way to directly hit PostgreSQL through JDBC, but I don't know how to do that yet. So have worked around by saving the candidate dataset to JSON, and then will use that as input to Spark.


In [None]:
# We have previously created a version of the transactions table 
# and filtered it down.
trans = sql_context.read.json('raw_data/trans_filtered.json')

In [None]:
# Persist the data
trans.persist()

In [None]:
print(trans.count(), len(trans.columns))

In [None]:
# check schema
trans.printSchema()

### More exploration/testing

We won't be using pandas dataframes in the matrix factorization through Spark, but let's cast to one anyway as it will be easier to work with for EDA.

In [None]:
# cast to Pandas dataframe to turn timestamp data to datetime and check nulls. 
trans_df = trans.select('*').toPandas()
trans_df.info()

In [None]:
# Let's double check the data is how we expect it
trans_df.head()

In [None]:
trans_df['dt'] = pd.to_datetime(trans_df['date_sold'], unit='ms')

Yes. Reverse-confirmed versus the original transactions dataframe in the other notebook that this datetime is correct. 

### Data Prep for ALS

Let's aggregate the data to the two columns we need:
- `account_num` - This is the identifier for individual customers.


- `comic_title` - The comic. Represents individual volumes/runs of a comic.


- `score` - We need to figure out what we want to use to act as a `score`. If these were Amazon items then review scores would be natural fit; but we don't have that. We can maybe use a binary flag of `bought`/`not bought`. Or we can use the `qty_sold`. This might be interesting in that it might capture some interesting behavior from comic 'collectors/speculators'. Since this is first pass, I'm curious as to what `qty_sold` might do!


We only care about `account_num`, `comic_title` and `qty_sold`.

In [None]:
comics_sold = trans[['account_num', 'comic_title', 'qty_sold']]
comics_sold.persist()

In [None]:
comics_sold = comics_sold.withColumn('bought', lit(1))

In [None]:
comics_sold.show(10)

In [None]:
comics_sold = trans[['account_num', 'comic_title', 'qty_sold']]
comics_sold.persist()

In [None]:
total_comics_sold = ( comics_sold.groupBy(['account_num', 'comic_title'])
                               .agg({'qty_sold':'sum'})
                    )
total_comics_sold.persist()

Ok, let's take a look at the results.

In [None]:
total_comics_sold.show(10)

In [None]:
print(total_comics_sold.count(), len(total_comics_sold.columns))

In [None]:
total_comics_sold = total_comics_sold.withColumn('bought', lit(1))

I don't like that default column name. Let's fix that to be `qty_sold` again.

In [None]:
total_comics_sold.show(10)

In [None]:
cols = ['account_num', 'comic_title', 'bought']
total_comics_sold = total_comics_sold[cols]

In [None]:
print(total_comics_sold.count(), len(total_comics_sold.columns))

### Formatting

Sooooooo, I forgot that the values need to be numeric. So need to fix that.

#### Convert `account_id` to integer

In [None]:
to_int_udf = F.udf(dfc.make_int, IntegerType())

In [None]:
account_num_col = total_comics_sold['account_num']

In [None]:
total_comics_sold = total_comics_sold.withColumn('account_id'
                                        ,to_int_udf(account_num_col))
total_comics_sold.persist()

In [None]:
total_comics_sold.show(10)

In [None]:
print(total_comics_sold.count(), len(total_comics_sold.columns))

Now I need to find a way to give ids to the `comic_title`. Kind of clunky, but I do have the version in PostgreSQL of the big table. I can just build an ID table up there as source of truth. I could do something on PySpark side, but then think would want to save it somewhere (e.g. the DB) anyway. So might as well do it from the top.

#### Get `comic_id`

In [None]:
comics = sql_context.read.json('raw_data/comics.json')
comics.persist()

In [None]:
comics.count()

In [None]:
comics.show(10)

In [None]:
print(comics.count(), len(comics.columns))

Now we need to join this back into `total_comics_sold`.

In [None]:
# Set aliases
tot = total_comics_sold.alias('tot')
com = comics.alias('com')

In [None]:
tot_sold_ids_only = tot.join(com.select('comic_id','comic_title')
                      ,tot.comic_title==com.comic_title).select('account_id'
                                                                , 'comic_id'
                                                                , 'bought')
tot_sold_ids_only.persist()
tot_sold_ids_only.show(10)

In [None]:
tot_sold_ids_only.printSchema()

In [None]:
print(tot_sold_ids_only.count(), len(tot_sold_ids_only.columns))

## Create table with zeros

In [None]:
# Get all accounts
acct_ids = tot_sold_ids_only.select("account_id").distinct().persist()
acct_ids.show()

In [None]:
# Get just comic_ids
comic_ids = comics.select("comic_id").distinct().persist()
comic_ids.show()

In [None]:
comic_ids.count()

In [None]:
acct_ids.count()

In [None]:
tot_sold_ids_only.show()

### Limit comic Ids to model

I think keeping comics with only a handful of sales will be a little noisy. And to a more pragmatic point, the less comics, the less resource intensive it will be because the matrix will be not as big.

Arbitrarily going to pick >= 20 sales for now.

In [None]:
comic_ids = ( tot_sold_ids_only.groupBy("comic_id").count().
             filter(col('count') >= 20).select("comic_id")
            )

In [None]:
comic_ids.show()

In [None]:
comic_ids.count()

#### Save to pandas

In [None]:
comic_ids_df = comic_ids.toPandas()

#### How many records are there after we limit to comics with mininum number of sales?

In [None]:
trans_df.shape

In [None]:
comic_ids_df.shape

In [None]:
trans_df.columns

Make a pandas df of `comics`

In [None]:
comics_df = comics.toPandas()

In [None]:
comics_df.head()

In [None]:
comics_df_filtered = comics_df.merge(comic_ids_df, right_on="comic_id"
                                     ,left_on="comic_id"
                                     ,how="inner")

In [None]:
comics_df_filtered.shape

In [None]:
comics_df_filtered.head()

In [None]:
comics_df_filtered.columns

In [None]:
tot_sold_ids_only.count()

In [None]:
sold_ids_df = tot_sold_ids_only.toPandas()

In [None]:
sold_ids_df.head()

In [None]:
sold_df_floored = sold_ids_df.merge(comics_df_filtered, right_on="comic_id"
                                  ,left_on="comic_id"
                                  ,how="inner")

In [None]:
sold_df_floored.head()

#### How many account-comic combos are there after filtering?

In [None]:
sold_df_floored.shape[0]

In [None]:
trans_floored = trans_df.merge(comics_df_filtered, right_on="comic_title"
                                  ,left_on="comic_title"
                                  ,how="inner")

In [None]:
trans_floored.head()

In [None]:
len(trans_floored['account_num'].unique())

#### Q: How many transactions after all filters?
- Accounts with >= 5 transactions and <= 300 transactions
- Comics that have been bought by >= 20 accounts

In [None]:
trans_floored.shape[0]

#### Q: Number of comic - account combos?

In [None]:
sold_df_floored.shape

#### Q: Number of unique accounts before filtering?

In [None]:
len(sold_ids_df['account_id'].unique())

#### Number of unique accounts after filtering

In [None]:
len(sold_df_floored['account_id'].unique())

In [None]:
# comic_ids_df.to_json('raw_data/comic_ids.json', orient='records'
#                      ,lines=True)

comic_ids_df.to_json('support_data/comic_ids.json', orient='records'
                     ,lines=True)

In [None]:
comic_ids_df.head()

In [None]:
acct_ids.count()

In [None]:
total_combos = comic_ids.count() * acct_ids.count()
total_combos

In [None]:
# Join together
all_combos = comic_ids.crossJoin(acct_ids).persist()

all_combos.count()

In [None]:
sold = tot_sold_ids_only.alias("sold")

In [None]:
tot_sold_ids_only.columns

In [None]:
final_combos = all_combos.join(sold, [sold.comic_id == all_combos.comic_id
                                      ,sold.account_id == all_combos.account_id], 
                              "left").select(all_combos.comic_id
                                             ,all_combos.account_id
                                             ,sold.bought).fillna(0).persist()

In [None]:
final_combos.show()

In [None]:
final_combos.count()

We have about 850K potential `account`, `comic` combinations.

Let's take a look at the sparsity of the matrix.

In [None]:
sparse_numerator = sold.count()
sparse_denominator = final_combos.count()
sparsity = 1 - (sparse_numerator/sparse_denominator)
sparsity

So about 7.5% populated. Not bad.

In [None]:
#df2.coalesce(1).write.format('json').save('/path/file_name.json')
#final_combos.write.format('json').save('raw_data/als_input_filtered_190915.json')

## Save this intermediate table.

To save work, if needed.

In [None]:
!rm -r raw_data/als_input_filtered_190916.pkl

In [None]:
final_combos.rdd.saveAsPickleFile('raw_data/als_input_filtered_190916.pkl')

Test reconstituting the pickle

In [None]:
#pickleRdd = sc.pickleFile('raw_data/als_input_filtered_190915.pkl').collect()
#df2 = sql_context.createDataFrame(pickleRdd)

In [None]:
als_data = final_combos.toPandas()

In [None]:
als_data.to_json('raw_data/als_input_filtered_190915.json', orient='records'
                     ,lines=True)

Test the pickle

In [None]:
unpickled_items = pd.read_pickle('support_data/item_factors_20190916.pkl')

In [None]:
comics_df.head()

In [None]:
unpickled_items.head()

In [None]:
ddd = unpickled_items.merge(comics_df, left_on='id', right_on='comic_id', how="inner")

In [None]:
ddd.head(20)