In [1]:
!scancel -u aschade

In [2]:
#%%
from os import makedirs
from os.path import exists
from pprint import pprint

from tqdm import tqdm
from time import time
import logging as log
import sys
import csv

import matplotlib.pyplot as plt
import matplotlib.dates as matdates
import numpy as np
import pandas as pd
pd.options.display.max_rows = 150
pd.options.display.max_columns = 50
import seaborn as sns
sns.set_style("whitegrid")

import dask
import dask.dataframe as dd
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

########################################################################################################

def floor(test, limit):
    return limit if test < limit else test

def ceiling(test, limit):
    return limit if test > limit else test

  from distributed.utils import tmpfile


In [3]:
cluster = SLURMCluster(
    cores=10,                          
    memory='100GB',  
#     memory='1400GB',
    
    local_directory='~/scratch',
    job_extra=[
#         '--reservation=lab_rdurante_304',
        '--time=05:00:00',
        
        '--partition=haswell',    
        '--nodes=1',
        
        '--job-name=dask',
        '--output=dask.out', 
        '--error=dask.error', 
        '--mail-user=aaron.schade@upf.edu',
        '--mail-type=NONE', 
    ],    
    n_workers=1,                 # this is internal to one job? one node? 
    
    interface='ib0',               # workers, no diag: em1, em2, ib0,   # no workers: lo, em1.851, idrac, em3 & em4 (no ipv4)
    scheduler_options={
#         'interface': 'em1',      # it wont allow you specify both an interface AND a host address
        'host': '10.30.50.163',    # launch on this address, open dashboard on the other?
    },
)
cluster.scale(jobs=1)


scheduler = Client(cluster)
print(scheduler)
dashboardLink = scheduler.dashboard_link.replace('10.30.50.163', '10.60.110.163')
# dashboardLink = scheduler.dashboard_link
print(dashboardLink)
print(dashboardLink.replace('status', 'workers'))
print(dashboardLink.replace('status', 'graph'))

<Client: 'tcp://10.30.50.163:32788' processes=0 threads=0, memory=0 B>
http://10.60.110.163:8787/status
http://10.60.110.163:8787/workers
http://10.60.110.163:8787/graph


In [4]:
!squeue -u aschade

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          19119098   haswell     dask  aschade PD       0:00      1 (None)


In [14]:
paywallsDF = pd.read_excel('paywalls.xlsx')
paywallsDF = paywallsDF[['paywall_date', 'url']]

paywalls = {}
for index, row in paywallsDF[:18].iterrows():
        paywalls[row['url']] = str(row['paywall_date'].date())

paywalls['nytimes.com-1'] = '2012-04-01'
paywalls['nytimes.com-2'] = '2017-12-01'

print('paywalls:')
pprint(paywalls, indent=4)

######################################

with open('newsSitesList.txt', 'r') as f:
    newsSitesList = [line.strip() for line in f.readlines()]

for site in paywalls.keys():
    if site not in newsSitesList:
        newsSitesList.append(site) 
       
print('\nnews sites:')
pprint(newsSitesList[:10], indent=4)

######################################

colsOfInterest = [
    # 'ref_domain_name', 
    'domain_name',
    'event_date',

    'pages_viewed', 
    'duration', 
    # 'event_time', 

    'hoh_most_education', 
    'census_region', 
    'household_size',
    'hoh_oldest_age', 
    'household_income', 
    'children', 
    'racial_background',
    'connection_speed', 
    'country_of_origin', 
    'zip_code', 
    ]
###################

individualCharacteristics = [
    'hoh_most_education', 
    'census_region', 
    'household_size',
    'hoh_oldest_age', 
    'household_income', 
    'children', 
    'racial_background',
    'connection_speed', 
    'country_of_origin', 
    'zip_code', 
    ]

paywalls:
{   'azcentral.com': '2012-09-12',
    'bostonglobe.com': '2011-10-01',
    'chicagotribune.com': '2012-11-01',
    'dallasnews.com': '2011-03-01',
    'inquirer.com': '2013-12-01',
    'latimes.com': '2012-03-05',
    'newsday.com': '2009-10-28',
    'nj.com': '2020-06-24',
    'nypost.com': '2011-06-01',
    'nytimes.com': '2011-03-28',
    'nytimes.com-1': '2012-04-01',
    'nytimes.com-2': '2017-12-01',
    'plaindealer.com': '2020-07-01',
    'sandiegouniontribune.com': '2012-06-21',
    'sfchronicle.com': '2013-03-01',
    'staradvertiser.com': '2011-08-03',
    'startribune.com': '2011-11-01',
    'sun-sentinel.com': '2012-04-09',
    'tampabay.com': '2013-09-12',
    'washingtonpost.com': '2013-06-12'}

news sites:
[   '7am.com',
    'abcnews.com',
    'accessatlanta.com',
    'accuweather.com',
    'active.com',
    'activedayton.com',
    'ajc.com',
    'ap.org',
    'aroundcny.com',
    'austin360.com']


In [None]:
for newspaperSite, paywallDate in paywalls.items():
    if 'nytimes.com-1' not in newspaperSite: continue
    year = int(paywallDate[:4])

    ######################################################################################################

    print(f'\n' + f' {newspaperSite} '.center(80, '#'))

    prePWmonths = pd.Timestamp(paywallDate).dayofyear/30.5
    postPWmonths = 12 - prePWmonths
    if prePWmonths < 1 or postPWmonths < 1: continue

    idealRangeStart = pd.Timestamp(paywallDate) - pd.Timedelta('90 days')
    idealRangeEnd = pd.Timestamp(paywallDate) + pd.Timedelta('90 days')
    yearStart = pd.Timestamp(f'{year}-01-01')
    yearEnd = pd.Timestamp(f'{year}-12-31')

    rangeStart = floor(idealRangeStart, yearStart)
    rangeEnd = ceiling(idealRangeEnd, yearEnd)
    datesOfInterest = pd.date_range(rangeStart, rangeEnd, freq='W')

    print(f'paywall date: {paywallDate}')
    print(list(datesOfInterest.month.unique()))

    ######################################################################################################

    ddf = dd.read_parquet(
        f'../comscore/parquet/{year}', 
        index='machine_id', 
        columns=colsOfInterest,
        engine='fastparquet',   # you HAVE TO use the same engine to read as you did for creating the parquet files!!!  only then will fast index lookups work - however, you need the pyarrow (or python-snappy) package installed for google's amazing 'snappy' compression algo
        )
    ddf = ddf[ddf['event_date'].between(rangeStart, rangeEnd)]

    visitors = list(
        ddf.loc[ddf['domain_name'] == newspaperSite.strip('-12')]
        .index
        .unique()
        .compute()
        )
    # visitors = [13512886]


    for visitor in tqdm(visitors, desc='visitors done: '):
        thisVisitorRows = []


        # load into memory
        df = ddf.loc[visitor].compute()
        df['numberVisits'] = 1           # fill all rows with 1 -> when aggregated, becomes counter

        # get all websites they visited
        allSites = df['domain_name'].unique()

        # setup one row with default values
        defaultValues = {
            'machine_id':           visitor, 
            'date':                 '', 
            'day_of_month':         '', 
            'week_of_month':        '', 
            'month':                '', 
            'year':                 '', 

            'domain_name':          '', 
            'news_site_dummy':      0, 
            'number_visits':        0, 
            'number_pages_viewed':  0, 
            'time_spent_on_site':   0, 
            }
        # add individual characteristics
        defaultValues.update(df.loc[visitor, individualCharacteristics].iloc[0].to_dict())     



        # aggregate by day and website, do not keep individual characteristics
        notIndividualCharacteristics = [col for col in df.columns if col not in individualCharacteristics]
        agged = (
            df[notIndividualCharacteristics]
            .groupby([pd.Grouper(key='event_date', freq='W', ), 'domain_name'])
            .sum()
            .reset_index(drop=False)
            )




        # were in one individual, 
        # for this individual, for each date, for each site; 
        # fill in the information you have, if not, leave default
        for date in datesOfInterest:
            for site in allSites:

                # identifying stuff
                thisRow = defaultValues.copy()
                # date = pd.Timestamp(date)
                thisRow['date'] = date.strftime('%Y-%m-%d')
                thisRow['day_of_month'] = date.day
                thisRow['week_of_month'] = (date.day - 1)//7 + 1
                thisRow['month'] = date.month
                thisRow['year'] = date.year

                thisRow['domain_name'] = site

                thisRow['news_site_dummy'] = 1 if site in newsSitesList else 0


                # aggregation information
                aggedRowOfInterest = agged.loc[(agged['event_date'] == date) & (agged['domain_name'] == site)]

                if len(aggedRowOfInterest) != 0:
                    thisRow['number_visits'] = aggedRowOfInterest['numberVisits'].values[0]
                    thisRow['number_pages_viewed'] = aggedRowOfInterest['pages_viewed'].values[0]
                    thisRow['time_spent_on_site'] = aggedRowOfInterest['duration'].values[0]



                thisVisitorRows.append(thisRow)



        # printing to file
        # one per individual
        subfolder = f'outputs/longTable/{newspaperSite}/individuals'
        if not exists(subfolder): makedirs(subfolder)

        with open(f'{subfolder}/{visitor}.csv', 'w', newline='') as file:
            keys = thisVisitorRows[0].keys()
            dict_writer = csv.DictWriter(file, keys)

            dict_writer.writeheader()
            dict_writer.writerows(thisVisitorRows)




    ddf = (
        dd.read_csv(
            f'{subfolder}/*.csv', 
            encoding_errors='replace')
        .set_index(
            'machine_id', 
            npartitions='auto', 
            compute=False)
    )


    ddf.to_csv(f'outputs/longTable/{newspaperSite}/long_table.csv', single_file=True)


In [18]:
newspaperSite = 'nytimes.com-1'
subfolder = f'outputs/longTable/{newspaperSite}/individuals'
ddf = dd.read_csv(
        f'{subfolder}/*.csv', 
        encoding_errors='replace', 
)


ddf.to_csv(f'outputs/longTable/{newspaperSite}/long_table.csv', single_file=True)

['/gpfs42/robbyfs/scratch/lab_rdurante/aschade/paywalls/outputs/longTable/nytimes.com-1/long_table.csv']

In [12]:
########################################## testing ####################################

ddf = dd.read_csv('outputs/longTable/nytimes.com-2/long_table.csv')
ddf.head()

Unnamed: 0,machine_id,date,day_of_month,week_of_month,month,year,domain_name,news_site_dummy,number_visits,number_pages_viewed,time_spent_on_site,hoh_most_education,census_region,household_size,hoh_oldest_age,household_income,children,racial_background,connection_speed,country_of_origin,zip_code
0,99534294,2017-09-10,10,2,9,2017,envybox.io,0,1,1,1,99,3,1,11,16,0,1,1,0,77084
1,99534294,2017-10-15,15,3,10,2017,dom.co.il,0,1,1,1,99,3,1,11,16,0,1,1,0,77084
2,99534294,2017-10-15,15,3,10,2017,graniru.org,0,0,0,0,99,3,1,11,16,0,1,1,0,77084
3,99534294,2017-10-15,15,3,10,2017,yandex.net,0,1,1,1,99,3,1,11,16,0,1,1,0,77084
4,99534294,2017-10-15,15,3,10,2017,likebtn.com,0,1,1,1,99,3,1,11,16,0,1,1,0,77084


In [13]:
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 21 entries, machine_id to zip_code
dtypes: object(2), int64(19)

In [14]:
len(ddf)

49370796

In [15]:
ddf.machine_id.nunique().compute()

8379