In [1]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_set, collect_list, lit, sum, udf, concat_ws, col, count, abs, date_format, \
    from_utc_timestamp, expr, min, max
from pyspark.sql.functions import col, udf, size
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import explode, posexplode

In [2]:
spark

In [84]:
OUTPUT_PATH: str = os.environ[
    "OUTPUT_PATH"
] if "OUTPUT_PATH" in os.environ else os.path.join("output")
BASE_DIR: str = os.path.join(OUTPUT_PATH, "globo")
DATASET_DIR: str = os.path.join(OUTPUT_PATH, "globo", "dataset")

BASE_DATASET_FILE : str = os.path.join(OUTPUT_PATH, "globo", "archive", 'clicks', 'clicks', '*.csv')

    
df = spark.read.csv(BASE_DATASET_FILE, header=True, inferSchema=True)

In [85]:
df.count()

2988181

In [86]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- session_id: long (nullable = true)
 |-- session_start: long (nullable = true)
 |-- session_size: integer (nullable = true)
 |-- click_article_id: integer (nullable = true)
 |-- click_timestamp: long (nullable = true)
 |-- click_environment: integer (nullable = true)
 |-- click_deviceGroup: integer (nullable = true)
 |-- click_os: integer (nullable = true)
 |-- click_country: integer (nullable = true)
 |-- click_region: integer (nullable = true)
 |-- click_referrer_type: integer (nullable = true)



In [87]:
df = df.withColumnRenamed("session_id", "SessionID")\
    .withColumnRenamed("click_timestamp", "Timestamp_")\
    .withColumnRenamed("click_article_id", "ItemID")\
    .withColumn("Timestamp",F.from_unixtime(col("Timestamp_")/lit(1000)).cast("timestamp"))\
    .orderBy(col('Timestamp')).select("SessionID", "ItemID", "Timestamp", "Timestamp_")

In [88]:
df.show(1)

+----------------+------+-------------------+-------------+
|       SessionID|ItemID|          Timestamp|   Timestamp_|
+----------------+------+-------------------+-------------+
|1506826329267796|234853|2017-10-01 00:00:00|1506826800026|
+----------------+------+-------------------+-------------+
only showing top 1 row



In [89]:
df.printSchema()

root
 |-- SessionID: long (nullable = true)
 |-- ItemID: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Timestamp_: long (nullable = true)



In [90]:
1506826800026
1549033933

1549033933

In [99]:
df.orderBy(col('Timestamp'), ascending=False).filter(col('Timestamp') < '2017-10-16 24:59:59').show(100)

+----------------+------+-------------------+-------------+
|       SessionID|ItemID|          Timestamp|   Timestamp_|
+----------------+------+-------------------+-------------+
|1508205226121508| 36162|2017-10-16 23:59:58|1508205598971|
|1508204554122178|277107|2017-10-16 23:59:56|1508205596794|
|1508205596265651|177442|2017-10-16 23:59:56|1508205596210|
|1508204173113977|211442|2017-10-16 23:59:55|1508205595299|
|1508204800214313| 36162|2017-10-16 23:59:55|1508205595791|
|1508204950581387| 70758|2017-10-16 23:59:52|1508205592460|
|1508205520266626| 36162|2017-10-16 23:59:51|1508205591160|
|1508205185244493|234978|2017-10-16 23:59:51|1508205591283|
|1508205557313642|202370|2017-10-16 23:59:49|1508205589408|
|1508203589203632|224730|2017-10-16 23:59:48|1508205588280|
|1508205442291596| 36162|2017-10-16 23:59:46|1508205586031|
|1508204438162127|315104|2017-10-16 23:59:46|1508205586181|
|1508205084221452|266024|2017-10-16 23:59:45|1508205585360|
|1508203661116684| 30760|2017-10-16 23:5

In [91]:
df.select(min(col('Timestamp')), max(col('Timestamp'))).show()

+-------------------+-------------------+
|     min(Timestamp)|     max(Timestamp)|
+-------------------+-------------------+
|2017-10-01 00:00:00|2017-11-13 18:04:14|
+-------------------+-------------------+



In [92]:
max_timestamp = df.select(max(col('Timestamp'))).collect()[0]['max(Timestamp)']
max_timestamp

datetime.datetime(2017, 11, 13, 18, 4, 14)

In [93]:
from datetime import datetime, timedelta

init_timestamp = max_timestamp - timedelta(days = 16 )
init_timestamp

datetime.datetime(2017, 10, 28, 18, 4, 14)

In [11]:
_df = df.filter(col('Timestamp') >= init_timestamp).cache()

In [12]:
_df.count()

2811894

In [9]:
df_train = pd.read_csv('/media/workspace/triplet_session/output/yoochoose/dataset/train_0.01_test=random_42_none_SessionInteractionDataFrame_AvailableItems____None_7a340ca010.csv')
df_train.shape

(812394, 7)

In [10]:
df_train.head()

Unnamed: 0,SessionID,Timestamp,ItemID,Category,ItemIDHistory,AvailableItems,visit
0,171168,2014-04-01 00:00:32.610,214820231,0,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 214594678]","[214834980, 214658280, 214552492, 214832383, 2...",1
1,389654,2014-04-01 00:00:48.736,214587557,0,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 214826705]","[214507408, 214631735, 214730518, 214600479, 2...",1
2,375257,2014-04-01 00:01:03.866,214582935,0,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 214644307]","[214674857, 214559019, 214555277, 214694756, 2...",1
3,263073,2014-04-01 00:01:07.619,214716982,0,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 214716982]","[214544936, 214691248, 214836997, 214692718, 2...",1
4,345618,2014-04-01 00:01:15.896,214705119,0,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 214687685]","[214552162, 214669867, 214708018, 214586910, 2...",1


In [14]:
df_item_s = df_session[['ItemID']].drop_duplicates()
df_item_s.shape

(7004, 1)

In [12]:
df_item_t = df_train[['ItemID']].drop_duplicates()
df_item_t.shape

(19945, 1)