# Sorting

This notebook sorts the data by timestamp.

Data is expected to be located in `/data/recsys/part-XXXXX` and sorted data is going to be located in `/data/recsys/sorted/part-XXXXX.csv` where each file contains all the interactions of an hour.

+ Min timestamp: 1612396800 (4 February 2021 0:00:00)
+ Max timestamp: 1614211199 (24 February 2021 23:59:59) 

To convert from partID to hour do the following operation:

In [15]:
def part_to_timestamp(partID):
    import pandas as pd
    min_h = pd.to_datetime(partID * 3600 + 1612396800, unit='s')
    max_h = pd.to_datetime((partID + 1) * 3600 + 1612396800 - 1, unit='s')

    print(f'Part {partID:03d} from time {min_h} to {max_h}')

Example:

In [16]:
part_to_timestamp(partID=0)
part_to_timestamp(partID=100)
part_to_timestamp(partID=503)

Part 000 from time 2021-02-04 00:00:00 to 2021-02-04 00:59:59
Part 100 from time 2021-02-08 04:00:00 to 2021-02-08 04:59:59
Part 503 from time 2021-02-24 23:00:00 to 2021-02-24 23:59:59


## Read and sort the data

Sorting is separated in three different processes, otherwise it freezes the server. The three partitions are:
+ Files from 0-99
+ Files from 100-199
+ Files from 200-290

In [None]:
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

cluster = LocalCluster()
# cluster.scale(3)

client = Client(cluster)
client

In [None]:
column_type={
              'bert': str,'hashtags':str,'tweet_id':str,'media':str,'links':str,'domains':str,'type':str,'language':str,'timestamp':np.uint32,
              'AUTH_user_id':str,'AUTH_follower_count':np.uint32,'AUTH_following_count':np.uint32,'AUTH_verified':bool,'AUTH_account_creation':np.uint32,
              'READ_user_id':str,'READ_follower_count':np.uint32,'READ_following_count':np.uint32,'READ_verified':bool,'READ_account_creation':np.uint32,
              'auth_follows_read':bool,
              'reply_timestamp':'Int32','retweet_timestamp':'Int32','quote_timestamp':'Int32','like_timestamp':'Int32'
            }

First, we separate the data by hour. Since **1612396800** is the minimum timestamp we can do:
$$\text{part} = \left[\dfrac{\text{timestamp} - 1612396800}{3600}\right], \ \ \text{with} \ \ [] \ \ \text{the Floor function}$$

In [2]:
df = dd.read_csv('/data/recsys/part-000*', names=list(column_type.keys()), header=None, sep='\x01', dtype=column_type)
df['part'] = df.apply(lambda row: (row.timestamp - 1612396800) // 3600, axis=1, meta=(None, 'uint16'))
df.groupby('part').apply(lambda group: group.drop(columns=['part']).to_csv(f'/data/recsys/sorted/part-{group.name:05d}.csv',index=False), meta=df._meta).size.compute()

In [None]:
df = dd.read_csv('/data/recsys/part-001*', names=list(column_type.keys()), header=None, sep='\x01', dtype=column_type)
df['part'] = df.apply(lambda row: (row.timestamp-1612396800)//3600, axis=1, meta=(None, 'uint16'))
df.groupby('part').apply(lambda group: group.drop(columns=['part']).to_csv(f'/data/recsys/sorted/part-{group.name:05d}.csv', mode='a', index=False, header=False), meta=df._meta).size.compute()

In [None]:
df = dd.read_csv('/data/recsys/part-002*', names=list(column_type.keys()), header=None, sep='\x01', dtype=column_type)
df['part'] = df.apply(lambda row: (row.timestamp-1612396800)//3600, axis=1, meta=(None, 'uint16'))
df.groupby('part').apply(lambda group: group.drop(columns=['part']).to_csv(f'/data/recsys/sorted/part-{group.name:05d}.csv', mode='a', index=False, header=False), meta=df._meta).size.compute()

Then, we can sort files individually,

In [None]:
from tqdm.notebook import trange
for i in trange(0, 503):
    df = pd.read_csv(f'/data/recsys/sorted/part-{i:05d}.csv')
    df = df.sort_values('timestamp')
    df.to_csv(f'/data/recsys/sorted/part-{i:05d}.csv', index=False)

### Train / validation

Move files from `/data/recsys/sorted` folder into `/data/recsys/RecsysDocker/test` folder to replicate the twitter docker execution. <br>
Files are sampled so the total is 10 millions approx.<br>
Create the `reals.csv` to simulate the leaderboard score

In [38]:
# !mkdir /data/recsys/test_csv
# !mkdir /data/recsys/RecsysDocker/test

In [19]:
_from, _to = 504-24*7, 504
_sample_each = int(1e7//(_to-_from)) # total ~ 10M

In [22]:
for i in trange(_from, _to, leave=False):
    f = f"{i:05d}"
    !mv /data/recsys/sorted/part-{f}.csv /data/recsys/test_csv

In [24]:
# !ls /data/recsys/test_csv

In [29]:
import pandas as pd
from tqdm.notebook import trange
reals = None
for i in trange(_from, _to, leave=False):
    df = pd.read_csv(f'/data/recsys/test_csv/part-{i:05d}.csv')
    df = df.sample(_sample_each, replace=False)
    if reals is None:
        reals = df.iloc[:,-4:]
    else:
        reals = pd.concat([reals, df.iloc[:,-4:]])
    df = df.iloc[:,:-4] 
    newi = i-_from
    df.to_csv(f'/data/recsys/RecsysDocker/test/part-{newi:05d}', sep='\x01', header=False, index=False)
reals.notna().astype(int).to_csv(f'/data/recsys/RecsysDocker/reals.csv', index=False, header=False)

  0%|          | 0/168 [00:00<?, ?it/s]