In [1]:
from datetime import datetime
import pytz
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, ShortType
from pyspark.sql.functions import col

In [2]:
file_train_node = 'train.csv'
file_train_feat = 'user_features.csv'
file_test_node = 'test.csv'

schema_train_node = StructType([
    StructField("node1_id", IntegerType()),
    StructField("node2_id", IntegerType()),
    StructField("is_chat", ShortType()),
])

schema_train_feat = StructType([
    StructField("node_id", IntegerType()),
    StructField("f1", ShortType()),
    StructField("f2", ShortType()),
    StructField("f3", ShortType()),
    StructField("f4", ShortType()),
    StructField("f5", ShortType()),
    StructField("f6", ShortType()),
    StructField("f7", ShortType()),
    StructField("f8", ShortType()),
    StructField("f9", ShortType()),
    StructField("f10", ShortType()),
    StructField("f11", ShortType()),
    StructField("f12", ShortType()),
    StructField("f13", ShortType()),
])

schema_test_node = StructType([
    StructField("id", IntegerType()),
    StructField("node1_id", IntegerType()),
    StructField("node2_id", IntegerType()),
])

In [3]:
# create sparksession
spark = SparkSession \
    .builder \
    .appName("hikeathon") \
    .getOrCreate()

In [4]:
train_node = spark.read.csv(file_train_node, header=True, schema=schema_train_node)
train_feat = spark.read.csv(file_train_feat, header=True, schema=schema_train_feat)

test_node = spark.read.csv(file_test_node, header=True, schema=schema_test_node)

In [5]:
# feature is_prev_chat
train_node.registerTempTable('original')
train_node.registerTempTable('other')
prev_is_chat = spark.sql("""select original.node1_id, original.node2_id, original.is_chat,
                      coalesce(other.is_chat, 0) as prev_is_chat 
                      from original left join other
                      on original.node1_id = other.node2_id
                      and original.node2_id = other.node1_id""")

In [6]:
prev_is_chat.registerTempTable('ip')
train_feat.registerTempTable('tf')
node_one = spark.sql("""select ip.node1_id, ip.node2_id, ip.prev_is_chat, ip.is_chat,
                             tf.f1 as f1_n1, tf.f2 as f2_n1, tf.f3 as f3_n1, tf.f4 as f4_n1,
                             tf.f5 as f5_n1, tf.f6 as f6_n1, tf.f7 as f7_n1, tf.f8 as f8_n1,
                             tf.f9 as f9_n1, tf.f10 as f10_n1, tf.f12 as f12_n1, tf.f13 as f13_n1
                             from ip left join tf on tf.node_id = ip.node1_id""")
node_one.registerTempTable('n1_feat')
train = spark.sql("""select n1_feat.*,
                             tf.f1 as f1_n2, tf.f2 as f2_n2, tf.f3 as f3_n2, tf.f4 as f4_n2,
                             tf.f5 as f5_n2, tf.f6 as f6_n2, tf.f7 as f7_n2, tf.f8 as f8_n2,
                             tf.f9 as f9_n2, tf.f10 as f10_n2, tf.f12 as f12_n2, tf.f13 as f13_n2
                             from n1_feat left join tf on tf.node_id = n1_feat.node2_id""")

In [7]:
train.take(5)

[Row(node1_id=4199003, node2_id=148, prev_is_chat=0, is_chat=0, f1_n1=31, f2_n1=31, f3_n1=31, f4_n1=31, f5_n1=31, f6_n1=31, f7_n1=31, f8_n1=31, f9_n1=31, f10_n1=30, f12_n1=30, f13_n1=7, f1_n2=31, f2_n2=30, f3_n2=0, f4_n2=31, f5_n2=30, f6_n2=0, f7_n2=31, f8_n2=30, f9_n2=0, f10_n2=31, f12_n2=0, f13_n2=15),
 Row(node1_id=178118, node2_id=148, prev_is_chat=0, is_chat=0, f1_n1=4, f2_n1=0, f3_n1=0, f4_n1=6, f5_n1=0, f6_n1=0, f7_n1=5, f8_n1=0, f9_n1=0, f10_n1=6, f12_n1=0, f13_n1=15, f1_n2=31, f2_n2=30, f3_n2=0, f4_n2=31, f5_n2=30, f6_n2=0, f7_n2=31, f8_n2=30, f9_n2=0, f10_n2=31, f12_n2=0, f13_n2=15),
 Row(node1_id=7334223, node2_id=148, prev_is_chat=0, is_chat=0, f1_n1=31, f2_n1=1, f3_n1=1, f4_n1=26, f5_n1=2, f6_n1=1, f7_n1=20, f8_n1=2, f9_n1=1, f10_n1=13, f12_n1=1, f13_n1=7, f1_n2=31, f2_n2=30, f3_n2=0, f4_n2=31, f5_n2=30, f6_n2=0, f7_n2=31, f8_n2=30, f9_n2=0, f10_n2=31, f12_n2=0, f13_n2=15),
 Row(node1_id=5573525, node2_id=148, prev_is_chat=0, is_chat=0, f1_n1=31, f2_n1=6, f3_n1=0, f4_n1=31

In [8]:
folder_name = 'features_{}'.format(datetime.strftime(datetime.now(pytz.timezone('Asia/Kolkata')), '%Y%m%d_%H%M%S'))
train.repartition(1).write.parquet(folder_name, compression='uncompressed')