In [1]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt

import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "6g") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [12]:
DATA_PATH = '/workspace/data/criteo/dac'

TRAIN_PATH = os.path.join(DATA_PATH, 'train.txt')

SAMPLING_RATIO = 0.1
TRAIN_PART = 0.8

In [4]:
from pyspark.sql.types import *


df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", "\t") \
    .option("header", "false") \
    .option("inferSchema", "true") \
    .load('file:///' + TRAIN_PATH)

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (

In [8]:
from pyspark.sql.functions import monotonically_increasing_id 


df = df \
    .withColumn('id', monotonically_increasing_id())

In [9]:
ids_sample = df \
    .select('id') \
    .sample(False, 0.01) \
    .collect()

ids_sample = sorted(ids_sample)

In [17]:
split_id = ids_sample[int(TRAIN_PART * len(ids_sample))][0]

In [19]:
import pyspark.sql.functions as F


df_sample = df \
    .sample(False, SAMPLING_RATIO) \
    .cache()

train_df = df_sample \
    .filter(F.col('id') < split_id)

test_df = df_sample \
    .filter(F.col('id') >= split_id)

In [20]:
test_df.count() / train_df.count()

0.25047156413040245

In [41]:
train_df \
    .repartition(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .save('file:///' + os.path.join(DATA_PATH, "train.csv"))

In [26]:
test_labels_df = test_df \
    .select('id', '_c0')

test_features_df = test_df \
    .drop('_c0')

In [42]:
test_labels_df \
    .repartition(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .save('file:///' + os.path.join(DATA_PATH, "test_labels.csv"))

test_features_df \
    .repartition(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .save('file:///' + os.path.join(DATA_PATH, "test_features.csv"))

In [56]:
mean_ctr = train_df.select(F.mean('_c0')).first()[0]
mean_ctr

0.25645748855844763

In [58]:
sample_submission = test_df \
    .select('id', F.lit(mean_ctr).alias('proba'))

In [59]:
sample_submission \
    .repartition(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .save('file:///' + os.path.join(DATA_PATH, "sample_submission.csv"))