# Extract Imports

Similar to 1.0, but extracts the imports.

This is using Celery, make sure it's running (`celery -A tasks worker --loglevel=info -Ofair`).

Result is saved in multiple files, as a checkpoint feature, joining is done afterwords.

In [None]:
import numpy as np
import pandas as pd
import datetime
from IPython.display import display
from tasks import extract_imports, join_dataframes
from celery import group

pe32 = pd.read_csv('data/pe32_samples.csv')
pe32['date'] = pd.to_datetime(pe32['date'], format='%Y/%m/%d')
# Set date as index
pe32 = pe32.set_index('date')


# Splitting in batches of ~10000 samples
n_batches = int(len(pe32) / 10000)
idx = 0
batches = np.array_split(pe32, n_batches)

for batch in batches:
    print('[{0}] Starting batch {1}...'.format(datetime.datetime.now(), idx))
    # Split into 5000 tasks
    n_tasks = 1000
    buckets = np.array_split(batch, n_tasks)

    print('[{0}] Sending parse tasks...'.format(datetime.datetime.now()))
    jobs = group([extract_imports.s(b.link.values.tolist()) for b in buckets if len(b) > 0])
    print('[{0}] Done sending, waiting...'.format(datetime.datetime.now()))
    result = jobs.apply_async()
    result.join()
    print('[{0}] All tasks done.'.format(datetime.datetime.now()))
    
    # Join all frames into one (reduce job) and save to CSV
    while len(result.get()) > 1:
        n_tasks = max(int(n_tasks / 64), 1)
        buckets = np.array_split(result.get(), n_tasks)
        print('[{0}] Sending {1} join tasks...'.format(datetime.datetime.now(), len(buckets)))
        jobs = group([join_dataframes.s(b.tolist()) for b in buckets if len(b) > 0])
        print('[{0}] Done sending, waiting...'.format(datetime.datetime.now()))
        result = jobs.apply_async()
        result.join()
        print('[{0}] Join done.'.format(datetime.datetime.now()))
    
    print('[{0}] Joining...'.format(datetime.datetime.now()))
    batch.join(pd.read_json(result.get()[0]), on='link').to_csv(path_or_buf='data/checkpoints/pe32_static_imports_{0}.csv'.format(idx))
    idx += 1
    print('[{0}] Done.'.format(datetime.datetime.now()))

The following joins all checkpoints.

In [None]:
import numpy as np
import pandas as pd
import datetime
from IPython.display import display

pe32 = pd.read_csv('data/pe32_samples.csv')
pe32['date'] = pd.to_datetime(pe32['date'], format='%Y/%m/%d')
# Set date as index
pe32 = pe32.set_index('date')

n_checkpoints = int(len(pe32) / 10000)
chkpnt_path = 'data/checkpoints/pe32_static_imports_{0}.csv'

result = []
for checkpoint in range(n_checkpoints):
    df = pd.read_csv(chkpnt_path.format(checkpoint), dtype=str)
    df['date'] = pd.to_datetime(df['date'], format='%Y/%m/%d')
    df = df.set_index('date')
    result.append(df)

result = pd.concat(result)
# Re order columns, for readability
cols = list(result.columns.values)
del cols[cols.index('md5')]
del cols[cols.index('link')]
cols = ['md5', 'link'] + cols
result = result[cols]
# Save to file
result.to_csv(path_or_buf='data/pe32_static_imports.csv')