# Pulling data from the MAG database to create training data

In [None]:
import pickle
import boto3
import pandas as pd
import numpy as np

In [None]:
from pyspark.sql import SparkSession
sc = spark.sparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StringType, FloatType, ArrayType, DoubleType, StructType, StructField
sqlContext = SQLContext(sc)

In [None]:
base_save_path = "s3://mag-model-data/raw_mag_data/"
iteration_save_path = "s3://mag-model-data/iteration_final/"

## First Data Pull

In [None]:
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", redshift_url) \
.option("user", "app_user") \
.option("password", redshift_password) \
.option("query", "select count(*) from mag_main_papers") \
.option("tempdir", base_save_path) \
.option("forward_spark_s3_credentials", True) \
.load()

In [None]:
df.show()

## Getting all data

In [None]:
journal_join_query = \
"""
SELECT paper_id, doc_type, paper_title, journal_name, publication_date, listagg(topic_name, '|') topics
FROM (
SELECT  a.paper_id, a.doc_type, a.paper_title, a.year, 
        a.publication_date, b.normalized_name as journal_name,
        d.normalized_name as topic_name, d.level
FROM (SELECT paper_id, doc_type, paper_title, book_title, year, 
             publication_date, online_date, journal_id
      FROM mag_main_papers) a
LEFT JOIN (SELECT journal_id, normalized_name
           FROM mag_main_journals) b
ON a.journal_id=b.journal_id
LEFT JOIN (SELECT *
           FROM mag_advanced_paper_fields_of_study) c
ON a.paper_id=c.paper_id
LEFT JOIN (SELECT *
           FROM mag_advanced_fields_of_study) d
ON c.field_of_study=d.field_of_study_id )
WHERE topic_name IS NOT NULL
GROUP BY paper_id, doc_type, paper_title, journal_name, publication_date
"""

In [None]:
all_data = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", redshift_url) \
.option("user", "app_user") \
.option("password", redshift_password) \
.option("query", journal_join_query) \
.option("tempdir", base_save_path) \
.option("forward_spark_s3_credentials", True) \
.load()

In [None]:
all_data.printSchema()

In [None]:
all_data.show(5)

In [None]:
filtered_data = all_data \
.select('paper_id','doc_type','paper_title','journal_name',
        F.to_date(F.col('publication_date'), 'yyyy-MM-dd').alias('publication_date'), 
        F.split(F.col('topics'), "\|").alias('topics')) \
.filter(F.col('publication_date') > "1950-01-01") \
.filter(F.col('publication_date') < "2021-06-01")

In [None]:
filtered_test_data = all_data \
.select('paper_id','doc_type','paper_title','journal_name',
        F.to_date(F.col('publication_date'), 'yyyy-MM-dd').alias('publication_date'), 
        F.split(F.col('topics'), "\|").alias('topics'))  \
.filter(F.col('publication_date') >= "2021-06-01") \
.filter(F.col('publication_date') < "2021-10-01")

In [None]:
filtered_data.cache().count()

In [None]:
filtered_test_data.cache().count()

## Get Train/Val/Test

The following cell will only need to be run once

In [None]:
def split_into_train_val_test(data, test_data, save_path):
    train, val, test = data.randomSplit([0.995, 0.0045, 0.0005])
    
    train.write.mode('overwrite').parquet(f"{save_path}train")
    
    val.write.mode('overwrite').parquet(f"{save_path}val")
    
    test.union(test_data.select(*test.columns)).write.mode('overwrite').parquet(f"{save_path}test")

In [None]:
split_into_train_val_test(filtered_data, filtered_test_data, base_save_path)

## Transforming the Data

In [None]:
train = spark.read.parquet(f"{base_save_path}train") \
.filter(F.col('doc_type')!='Patent')
val = spark.read.parquet(f"{base_save_path}val") \
.filter(F.col('doc_type')!='Patent')
test = spark.read.parquet(f"{base_save_path}test") \
.filter(F.col('doc_type')!='Patent')

In [None]:
train.cache().count()

In [None]:
val.cache().count()

In [None]:
test.cache().count()

In [None]:
train.select(F.split(F.col('paper_title'), " ").alias('words')) \
.withColumn("paper_title_len", F.size(F.col('words'))) \
.select("paper_title_len").describe().show()

In [None]:
word_counts = train.select(F.split(F.col('paper_title'), " ").alias('words')) \
.withColumn("word", F.explode(F.col('words'))) \
.groupBy('word').count() \
.filter(F.col('count') > 100).toPandas()

In [None]:
word_counts.shape

### Vocab (Using Basic Word Tokenizer)

In [None]:
def get_value_counts_for_column(data, col_name):
    if col_name == 'topics':
        counts_df = data.select(F.explode(F.col(col_name)).alias(col_name)).na.drop().groupBy(col_name).count() \
        .orderBy('count', ascending=False)
    else:
        counts_df = data.select('paper_id', col_name).na.drop().groupBy(col_name).count() \
        .orderBy('count', ascending=False)
    return counts_df.toPandas()


def create_vocab(data, save_path, col_name, cutoff=5, unk_token=True, none_token=True, val_count_df=None):
    
    try:
        val_counts = val_count_df.copy()
    except:
        val_counts = get_value_counts_for_column(data, col_name)
    
    val_counts.columns = [f"{col_name}_token", "count"]
    
    final_vocab_df = val_counts[val_counts['count'] >= cutoff].copy()
    
    if unk_token & none_token:
        token_list = ["[UNK]"] + ["[NONE]"] + list(final_vocab_df[f"{col_name}_token"])
    elif unk_token:
        token_list = ["[UNK]"] + list(final_vocab_df[f"{col_name}_token"])
    elif none_token:
        token_list = ["[NONE]"] + list(final_vocab_df[f"{col_name}_token"])
    else:
        token_list = list(final_vocab_df[f"{col_name}_token"])
        
    index_list = list(range(1, len(token_list)+1))
    
    final_vocab = dict(zip(token_list, index_list))
    
    client = boto3.client('s3')
    
    bucket_name = save_path.split("/")[2]
    vocab_key = f"{save_path.split('/')[-2]}/vocab/{col_name}_vocab.pkl"
    _ = client.put_object(Body=pickle.dumps(final_vocab), Bucket=bucket_name, Key=vocab_key)
    
    return final_vocab

In [None]:
def transform_dataset(data, save_path, dataset_type='train', val_count_df=None, doc_type_cutoff=300000, journal_cutoff=50, 
                      topic_cutoff=100, title_cutoff=100):
    if dataset_type=='train':
        print("Getting vocabs")
        doc_vocab = create_vocab(data, save_path, "doc_type", cutoff=doc_type_cutoff, unk_token=True, none_token=True)
        journal_vocab = create_vocab(data, save_path, "journal_name", cutoff=journal_cutoff, unk_token=True, none_token=True)
        target_vocab = create_vocab(data, save_path, "topics", cutoff=topic_cutoff, unk_token=False, none_token=False)
        title_vocab = create_vocab(data, save_path, "paper_title", cutoff=title_cutoff, unk_token=True, none_token=True, val_count_df=val_count_df)
    else:
        print("Loading vocabs")
        client = boto3.client('s3')
        bucket_name = save_path.split("/")[2]
        doc_vocab_key = f"{save_path.split('/')[-2]}/vocab/doc_type_vocab.pkl"
        doc_vocab = pickle.loads(client.get_object(Bucket=bucket_name, Key=doc_vocab_key)['Body'].read())
        journal_vocab_key = f"{save_path.split('/')[-2]}/vocab/journal_name_vocab.pkl"
        journal_vocab = pickle.loads(client.get_object(Bucket=bucket_name, Key=journal_vocab_key)['Body'].read())
        target_vocab_key = f"{save_path.split('/')[-2]}/vocab/topics_vocab.pkl"
        target_vocab = pickle.loads(client.get_object(Bucket=bucket_name, Key=target_vocab_key)['Body'].read())
        title_vocab_key = f"{save_path.split('/')[-2]}/vocab/paper_title_vocab.pkl"
        title_vocab = pickle.loads(client.get_object(Bucket=bucket_name, Key=title_vocab_key)['Body'].read())
    
    print(f"Doc type vocab length: {len(doc_vocab)}")
    print(f"Journal vocab length: {len(journal_vocab)}")
    print(f"Target vocab length: {len(target_vocab)}")
    print(f"Paper title vocab length: {len(title_vocab)}")
    
    def tokenize_target(feature):
        token_feature = []
        for part in feature:
            try:
                token_feature.append(target_vocab[part])
            except:
                pass
        if not token_feature:
            token_feature = [-1]
        return token_feature
    
    def tokenize_feature(feature, feature_name='doc_type'):
        if feature_name=='doc_type':
            vocab = doc_vocab
        else:
            vocab = journal_vocab
        unk_token_id = vocab.get('[UNK]')
        none_token_id = vocab.get('[NONE]')
        if feature:
            token_feature = [vocab.get(feature, unk_token_id)]
        else:
            token_feature = [none_token_id]
        return token_feature
    
    def tokenize_title(feature):
        split_feature = feature.split(" ")
        vocab = title_vocab
        unk_token_id = vocab.get('[UNK]')
        none_token_id = vocab.get('[NONE]')
        if feature:
            token_feature = [vocab.get(x, unk_token_id) for x in split_feature]
        else:
            token_feature = [none_token_id]
        return token_feature

    tokenize_title_udf = F.udf(tokenize_title, ArrayType(IntegerType()))
    tokenize_target_udf = F.udf(tokenize_target, ArrayType(IntegerType()))
    tokenize_feature_udf = F.udf(tokenize_feature, ArrayType(IntegerType()))
    
    print("Tokenizing data")
    final_data = data \
    .withColumn("doc_type_tok", tokenize_feature_udf(F.col('doc_type'), F.lit('doc_type'))) \
    .withColumn("journal_tok", tokenize_feature_udf(F.col('journal_name'), F.lit('journal_name'))) \
    .withColumn("target_tok", tokenize_target_udf(F.col('topics'))) \
    .withColumn("paper_title_tok", tokenize_title_udf(F.col('paper_title')))
    
    print("Saving data")
    if dataset_type == 'train':
        coalesce_num = 50
    elif dataset_type == 'val':
        coalesce_num = 10
    else:
        coalesce_num = 5
    final_data.select('paper_id',
                      'publication_date',
                      'doc_type_tok',
                      'journal_tok',
                      'target_tok',
                      'paper_title_tok') \
    .coalesce(coalesce_num).write.mode('overwrite').parquet(f"{save_path}tokenized_data/{dataset_type}")
    

In [None]:
transform_dataset(train, iteration_save_path, 'train', val_count_df=word_counts, doc_type_cutoff=300000, journal_cutoff=100, 
                      topic_cutoff=500, title_cutoff=100)

In [None]:
transform_dataset(val, iteration_save_path, 'val')

In [None]:
transform_dataset(test, iteration_save_path, 'test')

In [None]:
def get_value_counts_for_column(data, col_name):
    if col_name == 'topics':
        counts_df = data.select(F.explode(F.col(col_name)).alias(col_name)).na.drop().groupBy(col_name).count() \
        .orderBy('count', ascending=False)
    else:
        counts_df = data.select('paper_id', col_name).na.drop().groupBy(col_name).count() \
        .orderBy('count', ascending=False)
    return counts_df.toPandas()

In [None]:
val_counts = get_value_counts_for_column(train, 'topics')

In [None]:
val_counts.columns = [f"topics_token", "count"]
    
final_vocab_df = val_counts[val_counts['count'] >= 500].copy()

In [None]:
client = boto3.client('s3')
    
bucket_name = iteration_save_path.split("/")[2]
vocab_key = f"{iteration_save_path.split('/')[-2]}/vocab/topics_vocab_counts.pkl"
_ = client.put_object(Body=pickle.dumps(final_vocab_df), Bucket=bucket_name, Key=vocab_key)

## Get data into TFRecords (Use Local Machine or EC2)