In [1]:
%%capture
!pip install packaging -U
!pip install awswrangler swifter[notebook]
!pip install --upgrade s3fs

In [2]:
import awswrangler as wr
import pandas as pd
import swifter
import pyarrow as pa
import dask.dataframe as dd
from collections.abc import Iterable
import numpy as np

  import scipy.sparse


In [3]:
# schema = pa.parquet.read_schema('s3://cc-download-orbis-global/res/res-2020-05/res/batch_n_0.parquet')
# schema
schema = pa.schema([
    ('url', pa.string()),
    ('url_host_name', pa.string()),
    ('url_host_registered_domain', pa.string()),
    ('url_host_tld', pa.string()),
    ('fetch_time', pa.timestamp('s')),
    ('content_digest', pa.string()),
    ('crawl', pa.string()),
    ('content_languages', pa.string()),
    ('partition', pa.int8()),
    ('keyword_paragraphs', pa.list_(pa.string())),
])
schema

url: string
url_host_name: string
url_host_registered_domain: string
url_host_tld: string
fetch_time: timestamp[s]
content_digest: string
crawl: string
content_languages: string
partition: int8
keyword_paragraphs: list<item: string>
  child 0, item: string

In [4]:
%%time
df = wr.s3.read_parquet('s3://cc-download-orbis-global/res/',
                        ray_args={'bulk_read': True}, schema=schema) #, dtype_backend='pyarrow'

CPU times: user 36min 59s, sys: 9min 15s, total: 46min 15s
Wall time: 33min 40s


In [4]:
df.drop(columns=['url', 'url_host_name', 'url_host_tld', 'fetch_time', 'content_digest', 'content_languages', 
                 'partition'], inplace=True)

In [None]:
df.url_host_registered_domain = df.url_host_registered_domain.astype('category')

In [13]:
# paragraphs = df.keyword_paragraphs.explode().dropna().drop_duplicates()
# paragraphs = paragraphs[paragraphs.str.len()>=30]
# paragraphs = paragraphs.str[:20000]
# paragraphs = paragraphs.rename('paragraphs').to_frame()

In [9]:
# len(paragraphs) # 50 million paragraphs

50845309

In [14]:
# wr.s3.to_parquet(paragraphs, 's3://cc-download-orbis-global/res_paragraphs_input_llm/', dataset=True, 
#                  max_rows_by_file=10000)

{'paths': ['s3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_4853.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_3698.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_4900.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_4868.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_3792.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_4730.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_4364.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca507_4916.snappy.parquet',
  's3://cc-download-orbis-global/res_paragraphs_input_llm/48af8983d5ff418c89b01bf5796ca

### Merge LLM results

In [5]:
res_batches = wr.s3.list_objects('s3://cc-download-orbis-global/llm_res_paragraphs_batches/')

In [6]:
%%time
res_dask = dd.read_parquet(res_batches)
res = res_dask.compute()
del res_dask

CPU times: user 5min 6s, sys: 1min 38s, total: 6min 44s
Wall time: 4min 27s


In [7]:
res.rename(columns={'paragraphs': 'keyword_paragraphs'}, inplace=True)

In [8]:
df = df.explode('keyword_paragraphs')

In [9]:
df = df.merge(res, on='keyword_paragraphs', how='left')
del res

In [20]:
df.groupby('url_host_registered_domain').affected.max().value_counts(normalize=True).sort_index() # of all covid mentioning firms

affected
0.0    0.208841
1.0    0.316107
2.0    0.188484
3.0    0.286568
Name: proportion, dtype: float64

In [23]:
# memory_usage = df.memory_usage(deep=True)
# memory_usage.div(1e9).round(3)

Index                         1.280000e-07
url_host_registered_domain    4.520135e+01
crawl                         4.481736e+01
keyword_paragraphs            2.004863e+02
affected                      4.979707e+00
affectedness_category         3.311518e+01
tags                          3.311518e+01
tags_combined                 3.311518e+01
dtype: float64

In [None]:
df[(df.keyword_paragraphs.str.len() > 10) & df.affected.isna()]

#### Paragraph expiry

In [10]:
fetch_times = pd.read_csv('crawl_fetch_times.csv', parse_dates=['fetch_time'])
fetch_times.fetch_time = fetch_time.fetch_time.astype('category')
df = df.merge(fetch_times, on='crawl')
df['month_diff'] = df.groupby(['url_host_registered_domain', 'keyword_paragraphs'])['fetch_time'].diff().dt.days // 30
df['months_unchanged'] = df.groupby(['url_host_registered_domain', 'keyword_paragraphs'])['month_diff'].cumsum().fillna(0)

In [None]:
df['fetch_time'] = df['fetch_time'].astype('category')
df['fetch_time'] = df['fetch_time'].cat.set_categories(fetch_times.fetch_time, ordered=True)

In [11]:
df['affected'] = df.affected*(df.months_unchanged<12)

In [12]:
# set other indicators to 0 when affected==0
for col in ['affectedness_category', 'tags', 'tags_combined']:
    df.loc[df.affected==0, col] = pd.NA

In [13]:
df.drop(columns=['month_diff', 'months_unchanged'], inplace=True)

#### Format indicators

In [14]:
df['covid_mention'] = df.keyword_paragraphs.astype(str).str.len() > 10

In [15]:
from collections.abc import Iterable

In [16]:
df.tags.tail(1000000).explode().value_counts().head(50)

tags
                                     6687
customer hygiene measures            3318
closure                              3199
 supply chain issues                 2300
closure of facilities                1398
                                     ... 
 regular contact with authorities       1
measures to ensure operations           1
 limited contact                        1
 employees positive test                1
cases of SARS-CoV-2                     1
Name: count, Length: 33522, dtype: int64

In [33]:
# from collections.abc import Iterable
# for col in ['tags', 'tags_combined', 'affectedness_category']:
#     df[col] = df[col].swifter.apply(lambda d: d if isinstance(d, Iterable) else [])

Pandas Apply:   0%|          | 0/622463315 [00:00<?, ?it/s]

In [40]:
df_cat = df.reset_index()[['index', 'affectedness_category']].explode('affectedness_category')
df_cat = df_cat[df_cat.affectedness_category.str.strip().isin(['production', 'demand', 'supply'])].copy()
df_cat[['production_affected', 'demand_affected', 'supply_affected']] = pd.get_dummies(df_cat.affectedness_category)
df = df.join(df_cat.groupby('index')[['production_affected', 'demand_affected', 'supply_affected']].max(), how='left')

In [None]:
tags = ['hygiene measures', 'remote work', 'supply chain issues', 'closure', 'financial impact', 'travel restrictions']
df_tag = df.reset_index()[['index', 'tags_combined']].explode('tags_combined')
df_tag = df_tag[df_tag.tags_combined.str.strip().isin(tags)].copy()
df_tag[tags] = pd.get_dummies(df_tag.tags_combined)
df = df.join(df_tag.groupby('index')[tags].max(), how='left')

In [69]:
df.columns = [x.replace(' ', '_') for x in df.columns]
tags = [x.replace(' ', '_') for x in tags]

In [75]:
test = df.production_affected.copy()

In [83]:
float_cols = ['covid_mention', 'affected', 'production_affected', 'demand_affected', 'supply_affected'] + tags
df[float_cols] = df[float_cols].astype(np.float16)

In [131]:
df.loc[df.affected==0, ['production_affected', 'demand_affected', 'supply_affected'] + tags] = 0

#### Aggregate by firm and crawl date

In [85]:
df['date'] = df.fetch_time.dt.date

In [87]:
df.date = df.date.astype('category')

In [91]:
agg_dict = {'affected': 'max', 'covid_mention': 'max'}
affectedness_categories = ['production_affected', 'demand_affected', 'supply_affected']
agg_dict.update({cat: 'max' for cat in affectedness_categories})
agg_dict.update({tag: 'max' for tag in tags})

In [92]:
df_grouped = df.groupby(['url_host_registered_domain', 'date']).agg(agg_dict)

In [93]:
df_grouped['covid_mention'] = df_grouped['covid_mention'] > 0

In [94]:
df_grouped.groupby(level=0).covid_mention.max().value_counts(normalize=True, dropna=False).sort_index()

covid_mention
False    0.737393
True     0.262607
Name: proportion, dtype: float64

In [100]:
df_grouped[float_cols] = df_grouped[float_cols].fillna(0)

In [95]:
# set affected and other indicators to 0 for firms that were in CC but didn't mention Covid
# df_grouped.loc[df_grouped.keyword_paragraphs==0, 'affected'] = 0

In [96]:
memory_usage_grouped = df_grouped.memory_usage(deep=True)
memory_usage_grouped.div(1e9).round(3)

Index                  0.886
affected               0.246
covid_mention          0.123
production_affected    0.246
demand_affected        0.246
supply_affected        0.246
hygiene_measures       0.246
remote_work            0.246
supply_chain_issues    0.246
closure                0.246
financial_impact       0.246
travel_restrictions    0.246
dtype: float64

In [135]:
df_grouped.loc[df_grouped.affected==0, ['production_affected', 'demand_affected', 'supply_affected'] + tags] = 0

In [137]:
wr.s3.to_parquet(df_grouped.reset_index(), 
                 's3://cc-download-orbis-global/res_llm_consolidated/res_llm_consolidated.parquet',
                 index=False)

{'paths': ['s3://cc-download-orbis-global/res_llm_consolidated/res_llm_consolidated.parquet'],
 'partitions_values': {}}

#### Merge with orbis

In [None]:
query = 'SELECT * FROM orbis_global_min_5_empl_has_website_merged_url_list where length(nace_rev_2_core_code_4_digits) > 3'
orbis = wr.athena.read_sql_query(sql=query, database='ccindex')
orbis['country'] = orbis['country'].astype('category')
orbis['region_in_country'] = orbis['region_in_country'].astype('category')
orbis['city'] = orbis['city'].astype('category')
orbis['nace_2_digit'] = orbis['nace_rev_2_core_code_4_digits'].astype(str).str[:2].str.strip('0').astype('category')
orbis['nace_section'] = orbis.nace_2_digit.map(nace_sections_dict).astype('category')
orbis['nace_2_digit'] = orbis['nace_2_digit'].map(nace.set_index('nace_2_digit')['nace_description']).astype('category')
orbis.rename(columns={'websiteaddress': 'url_host_registered_domain'}, inplace=True)

# orbis.memory_usage(deep=True) / 1e6

query = 'SELECT * FROM orbis_global_min_5_empl_has_website_merged_url_list_dynamic where number_of_employees > 0 or operating_revenue_turnover > 0'
orbis_dynamic = wr.athena.read_sql_query(sql=query, database='ccindex')

# orbis_dynamic.memory_usage(deep=True) / 1e6

# for each domain keep only bvdids w highest number of employees and highest turnover and prefer available nace code
keep_bvdids = orbis_dynamic.merge(orbis[['bvdid', 'nace_2_digit', 'url_host_registered_domain']], on='bvdid', how='left').sort_values(['number_of_employees', 'operating_revenue_turnover', 'nace_2_digit'], ascending=False).drop_duplicates(subset=['url_host_registered_domain'], keep='first').bvdid
orbis = orbis[orbis.bvdid.isin(keep_bvdids)].copy(deep=True)
orbis.drop_duplicates(subset=['url_host_registered_domain'], inplace=True, keep='first')

# keep only orbis firms that are in CC
ccurls = df.index.get_level_values(0).unique().to_list()
orbis = orbis[orbis.url_host_registered_domain.isin(ccurls)].copy(deep=True)
orbis_dynamic = orbis_dynamic[orbis_dynamic.bvdid.isin(orbis.bvdid)].copy(deep=True)
print(len(orbis))

In [None]:
df_grouped_orbis = df_grouped.join(orbis.set_index('url_host_registered_domain'), how='left')

#### Add cc_data_available, frequency of updates variables

In [None]:
%%time
df = wr.s3.read_parquet('s3://cc-download-orbis-global/res/',
                        ray_args={'bulk_read': True}, schema=schema)

In [8]:
fetch_times = pd.read_csv('crawl_fetch_times.csv', parse_dates=['fetch_time'])
df = df.drop(columns=['fetch_time']).merge(fetch_times, on='crawl')
df['date'] = df.fetch_time.dt.date
df.date = df.date.astype('category')
df.drop(columns=['fetch_time'], inplace=True)

In [11]:
df['cc_data_available'] = True

In [12]:
df_grouped = wr.s3.read_parquet('s3://cc-download-orbis-global/res_llm_consolidated/res_llm_consolidated.parquet')

In [15]:
df_fin = df_grouped.merge(df[['url_host_registered_domain', 'date', 'cc_data_available']].drop_duplicates(), 
                          how='left',
                          on=['url_host_registered_domain', 'date'])

In [18]:
df_fin.cc_data_available = df_fin.cc_data_available.fillna(False)

In [21]:
tags = ['hygiene measures', 'remote work', 'supply chain issues', 'closure', 'financial impact', 'travel restrictions']
tags = [x.replace(' ', '_') for x in tags]
float_cols = ['covid_mention', 'affected', 'production_affected', 'demand_affected', 'supply_affected'] + tags

In [23]:
df_fin.loc[~df_fin.cc_data_available, float_cols] = np.nan

In [26]:
df_fin.groupby('url_host_registered_domain').affected.max().value_counts(normalize=True)

affected
0.0    0.792301
1.0    0.082986
3.0    0.075231
2.0    0.049482
Name: proportion, dtype: float64

In [27]:
df_fin.groupby('url_host_registered_domain').covid_mention.max().value_counts(normalize=True)

covid_mention
False    0.737393
True     0.262607
Name: proportion, dtype: Float64

In [29]:
wr.s3.to_parquet(df_fin, 
                 's3://cc-download-orbis-global/res_llm_consolidated/res_llm_consolidated_full.parquet',
                 index=False)

{'paths': ['s3://cc-download-orbis-global/res_llm_consolidated/res_llm_consolidated_full.parquet'],
 'partitions_values': {}}

#### Heartbeat variable based on content digest

In [5]:
relevant_cols = ['url_host_registered_domain', 'url', 'crawl', 'content_digest']
cdh = df[relevant_cols].groupby(['url_host_registered_domain', 'url']).nunique()

In [6]:
# for each url, how often does content change as a share of total crawls? (subtract one to get number of changes instead of number of unique)
cdh['cd_change_share_per_url'] = cdh.content_digest.sub(1).div(cdh.crawl.sub(1))

In [9]:
# for each domain, mean of changes over all urls
content_digest_heartbeat = cdh.groupby('url_host_registered_domain').cd_change_share_per_url.mean()

In [15]:
content_digest_heartbeat = content_digest_heartbeat.astype(np.float16)

In [26]:
content_digest_heartbeat = content_digest_heartbeat.to_frame(name='content_digest_heartbeat')

In [27]:
content_digest_heartbeat.to_parquet('s3://cc-download-orbis-global/res_llm_consolidated/res_llm_consolidated_content_digest_heartbeat.parquet')