## Data Preparation
Pre-process the data for the machine learning model:

1. Define variables
2. Load the datasets
3. Convert string to list and remove punctuations from text
4. Store datasets



In [None]:
import os
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, ArrayType, DoubleType, MapType
from pyspark.sql import functions as F
import re

spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")

### Define variables (input folder, datasets, ....)


In [None]:
# Define adls path and datset filename
account_name = ""
container = "cms"

input_train_folder = 'MINDsmall_train'
input_dev_folder = 'MINDsmall_dev'
input_test_folder = 'MINDlarge_test'

ds_name_activity = 'behaviors.tsv'
df_name_news = 'news.tsv'

# Azure Storage path
adls_path = "abfss://%s@%s.dfs.core.windows.net/MicrosoftNewsDataset/" % (container, account_name)



### Load the news and behavior datasets
The news dataset contains all the metadata of an article and the behavior dataset contains the activity of the users

In [None]:
# Load data into Spark Table


#### Behavior dataset
# Define schema 
schema = StructType([
    StructField("Impression_ID", IntegerType(), True),
    StructField("User_ID", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("History", StringType(), True),
    StructField("Impressions", StringType(), True)])

# load train dataset
df_activity_train = spark.read.csv(
    os.path.join(adls_path,input_train_folder,ds_name_activity),
    inferSchema=True,
    sep='\t', 
    header=False,
    schema=schema
    )

# load dev dataset
df_activity_dev = spark.read.csv(
    os.path.join(adls_path,input_dev_folder,ds_name_activity),
    inferSchema=True,
    sep='\t', 
    header=False,
    schema=schema
    )

# load test dataset
df_activity_test = spark.read.csv(
    os.path.join(adls_path,input_test_folder,ds_name_activity),
    inferSchema=True,
    sep='\t', 
    header=False,
    schema=schema
    )

# keep only a fraction of the total test dataset 
fraction = df_activity_train.count() / df_activity_test.count()
df_activity_test = df_activity_test.sample(fraction=fraction, seed=2020)

# Keep link to dataset in dictionary
dict_data = {'train': df_activity_train, 'test': df_activity_test, 'dev': df_activity_dev}

###### News Dataset
# Define Schema
schema = StructType([
    StructField("News_ID", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("SubCategory", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("Abstract", StringType(), True),
    StructField("URL", StringType(), True),
    StructField("Title_Entities", StringType(), True),
    StructField("Abstract_Entities", StringType(), True)
    ])

# Load data into Spark Table
df_news_train = spark.read.csv(
    os.path.join(adls_path,input_train_folder,df_name_news),
    # inferSchema=True,
    sep='\t', 
    header=False,
    schema=schema
    )

# load dev dataset
df_news_dev = spark.read.csv(
    os.path.join(adls_path,input_dev_folder,df_name_news),
    # inferSchema=True,
    sep='\t', 
    header=False,
    schema=schema
    )

# load test dataset
df_news_test = spark.read.csv(
    os.path.join(adls_path,input_test_folder,df_name_news),
    # inferSchema=True,
    sep='\t', 
    header=False,
    schema=schema
    )
    
# Keep link to dataset in dictionary
dict_news = {'train': df_news_train, 'test': df_news_test, 'dev': df_news_dev}
 

## Process datasets

The code removes all punctuation from the columns containing textual information like the abstract. Moreover, it converts the cleaned texted into a list of strings. This will facilitate the convertion of strings to integers as an ML model can only treat numerical data 

In [None]:
# helper function to remove punctuations
def removePunctuation(text):
    if text is not None:
        text=text.lower().strip()
        text=re.sub("[^0-9a-zA-Z ]","", text)
        return text
    return 'NoInfo'

# helper function
# convert a string list to actual list
def convert_string_to_list(x):
    if x is not None:
        lst = x.split(' ')
        return lst
    return ['NoInfo']

#Define user-define-function to apply on a spark dataframe
udf_punc_remover = F.udf(lambda row: removePunctuation(row),StringType())
udf_csl = F.udf(lambda row: convert_string_to_list(row),ArrayType(StringType()))

#Define dictionary of results datasets
results = {'train': None,'dev': None,'test':None}

# Loop over the train, dev, test dataset to apply preprocessing
for key, df_activity in dict_data.items():

    # Convert string of past articles to list
    df_preprocess = df_activity.withColumn('Impressions',udf_csl('Impressions'))
    df_preprocess = df_preprocess.withColumn('History',udf_csl('History'))

    # Extract list of past articles into a table of 1 User - 1 article
    df_preprocess = df_preprocess.withColumn('flat_impressions',F.explode('Impressions'))

    # Extract News ID
    df_preprocess = df_preprocess.withColumn('News_ID',F.split(F.col("flat_impressions"), "-").getItem(0))
    if key != 'test':
        # Extract target variables from news (whether the article was clicked on or not)
        df_preprocess = df_preprocess.withColumn('Clicked',F.split(F.col("flat_impressions"), "-").getItem(1).cast(IntegerType()))
    
    # drop temporary column
    df_preprocess = df_preprocess.drop('flat_impressions')

    # Join activity dataset with news metadata dataset
    df_preprocess = df_preprocess.join(dict_news[key],on=['News_ID'],how='inner')

    # Remove strings from IDs and convert them to integers
    df_preprocess = df_preprocess.withColumn('News_ID',F.translate("News_ID", "N", "").cast(IntegerType()))
    df_preprocess = df_preprocess.withColumn('User_ID',F.translate("User_ID", "U", "").cast(IntegerType()))

    # Remove punctuations from title and abastract
    df_preprocess = df_preprocess.withColumn("Title", udf_punc_remover("Title"))
    df_preprocess = df_preprocess.withColumn("Abstract", udf_punc_remover("Abstract"))

    #Drop any rows that contains null information
    df_preprocess = df_preprocess.dropna()

    # Keep results in dictionary
    results[key] = df_preprocess


## Store datasets


In [None]:
# Write dataset to spark table
results['train'].write.mode('overwrite').saveAsTable('default.ActivityTrain')
results['test'].write.mode('overwrite').saveAsTable('default.ActivityTest')
results['dev'].write.mode('overwrite').saveAsTable('default.ActivityDev')
