<a href="https://colab.research.google.com/github/lengochai97/thesis/blob/master/notebooks/feature_construction/31_Samples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Connect to Google Drive

In [0]:
%%capture

import google.colab.drive

google.colab.drive.mount('/content/gdrive', force_remount=True)

# Install Spark and dependencies

In [0]:
import os

os.environ['HADOOP_VERSION'] = '2.7'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/opt/spark'
os.environ['SPARK_VERSION'] = '2.4.3'

In [0]:
%%capture

!wget -qN https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz
!tar -xzf spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz -C /opt
!rm spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz
!rm -rf /opt/spark
!ln -s /opt/spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION /opt/spark
!pip install -q findspark


# Create SparkSession

In [0]:
import findspark

findspark.init()

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()

# Read files

In [0]:
import json

import pyspark.sql.functions as F
import pyspark.sql.types as T

In [0]:
DATA_PATH = '/content/gdrive/My Drive/dataset/adressa/one_week'

In [0]:
with open(os.path.join(DATA_PATH, 'schema', 'clean.json')) as file:
  clean_schema = T.StructType.fromJson(json.load(file))

In [0]:
df_clean = spark.read.json(os.path.join(DATA_PATH, 'clean'), schema=clean_schema)

In [0]:
with open(os.path.join(DATA_PATH, 'schema', 'news_features.json')) as file:
  news_features_schema = T.StructType.fromJson(json.load(file))

In [0]:
df_news_features = spark.read.json(os.path.join(DATA_PATH, 'news_features'), schema=clean_schema)

# Construct samples

In [0]:
TIME_BUCKET_RANGE = 60

In [0]:
df_event = (
    df_clean
    .select(
        F.column('userId'),
        F.column('time'),
        (F.column('time') / TIME_BUCKET_RANGE).cast(T.LongType()).alias('timeBucket'),
        F.column('newsId'),
    )
)

## Click

In [0]:
df_sample_click = (
    df_event
    .select(
        F.column('userId'),
        F.column('time'),
        F.column('newsId'),
        F.lit(1).alias('clickLabel'),
    )
)

## No-Click

In [0]:
from pyspark.ml.feature import IndexToString, StringIndexer
from pyspark.sql import Window

In [0]:
news_id_indexer = (
    StringIndexer(
        inputCol='newsId',
        outputCol='newsIdIndexed',
        stringOrderType='alphabetAsc',
    )
    .fit(df_news_features.select(F.column('newsId')))
)

news_id_indexer_converter = (
    IndexToString(
        inputCol='newsIdIndexed',
        outputCol='newsId',
        labels=news_id_indexer.labels,
    )
)

In [0]:
df_click_list = (
    news_id_indexer
    .transform(df_event)
    .groupBy(F.column('userId'))
    .agg(F.collect_set(F.column('newsIdIndexed')).alias('clickList'))
)

In [0]:
df_candidate_list = (
    news_id_indexer
    .transform(df_event)
    .groupBy(F.column('timeBucket'))
    .agg(F.collect_set(F.column('newsIdIndexed')).alias('newsIdIndexedList'))
    .withColumn(
        'pastList',
        F.collect_list(F.column('newsIdIndexedList')).over(
            Window
            .orderBy('timeBucket')
            .rangeBetween(Window.unboundedPreceding, -1)
        ),
    )
    .withColumn(
        'pastList',
        F.array_distinct(F.flatten(F.column('pastList'))),
    )
    .withColumn(
        'futureList',
        F.collect_list(F.column('newsIdIndexedList')).over(
            Window
            .orderBy('timeBucket')
            .rangeBetween(1, Window.unboundedFollowing)
        ),
    )
    .withColumn(
        'futureList',
        F.array_distinct(F.flatten(F.column('futureList'))),
    )
    .repartitionByRange(F.column('timeBucket'))
    .withColumn(
        'candidateList',
        F.array_intersect(F.column('pastList'), F.column('futureList')),
    )
    .select(
        F.column('timeBucket'),
        F.column('candidateList'),
    )
)

In [0]:
df_sample_no_click = (
    df_event
    .select(
        F.column('userId'),
        F.column('time'),
        F.column('timeBucket'),
    )
    .join(df_click_list, on='userId', how='inner')
    .join(df_candidate_list, on='timeBucket', how='inner')
    .withColumn(
        'noClickList',
        F.array_except(F.column('candidateList'), F.column('clickList')),
    )
    .withColumn(
        'noClickList',
        F.shuffle(F.column('noClickList')),
    )
    .withColumn(
        'noClickList',
        F.slice(F.column('noClickList'), 1, 11),
    )
    .withColumn(
        'newsIdIndexed',
        F.explode('noClickList'),
    )
    .select(
        F.column('userId'),
        F.column('time'),
        F.column('newsIdIndexed'),
    )
)

df_sample_no_click = (
    news_id_indexer_converter
    .transform(df_sample_no_click)
    .select(
        F.column('userId'),
        F.column('time'),
        F.column('newsId'),
        F.lit(0).alias('clickLabel'),
    )
)

## Union

In [0]:
df_sample = df_sample_click.union(df_sample_no_click)

# Write files

In [0]:
df_sample = (
    df_sample
    .repartition(F.column('userId'), F.column('time'))
    .sortWithinPartitions(
        F.column('time'),
        F.column('userId'),
        F.column('clickLabel'),
        F.column('newsId'),
        ascending=[True, True, False, True],
    )
)

In [0]:
%%time

df_sample.write.json(os.path.join(DATA_PATH, 'samples'))

CPU times: user 76.6 ms, sys: 18.9 ms, total: 95.4 ms
Wall time: 7min 10s


In [0]:
with open(os.path.join(DATA_PATH, 'schema', 'samples.json'), 'w+') as file:
  json.dump(df_sample.schema.jsonValue(), file)