In [1]:
import gc
import gzip
import time
import json
import shutil
import os, sys
import tldextract
import collections
import pandas as pd
from tqdm import tqdm
import urllib.request

# had to add:
from multiprocessing import Pool

In [2]:
storage_folder = '../data/index_paths/'

cc_indexes = [os.path.join(storage_folder, f) for f in os.listdir(storage_folder) 
                    if os.path.isfile(os.path.join(storage_folder, f))]

first_file = cc_indexes[0]

In [3]:
first_file

'../data/index_paths/cc-index.paths'

In [4]:
os.listdir(storage_folder)

['cc-index.paths', 'cc-index.paths.gz']

In [5]:
cc_indexes

['../data/index_paths/cc-index.paths', '../data/index_paths/cc-index.paths.gz']

Functions from parse_cc_index.py:

In [6]:
def read_every_line(fname, max_lines=-1):
    lines = []
    with open(fname, encoding='utf-8') as f:
        for i, l in enumerate(f):
            lines.append(l)
            if i>max_lines and max_lines>0:
                break
    return lines

def reporthook(count, block_size, total_size):
    global start_time
    if count == 0:
        start_time = time.time()
        return
    duration = time.time() - start_time
    progress_size = int(count * block_size)
    speed = int(progress_size / (1024 * duration))
    percent = int(count * block_size * 100 / total_size)
    sys.stdout.write("\r...%d%%, %d MB, %d KB/s, %d seconds passed" %
                    (percent, progress_size / (1024 * 1024), speed, duration))
    sys.stdout.flush()

def save(url, filename):
    urllib.request.urlretrieve(url, filename, reporthook)

def process_index_file_line(line):
    assert type(line)==str
    
    try:
        lst = line.replace('\n','').split()
        ts = lst[1]
        data = json.loads(line.replace('\n','').split(ts)[-1].strip())
    except:
        return ()
    
    if data['status'] != '200':
        return ()
    else:
        try:
            language = data['languages']
        except:
            language = 'none'
            
        try:
            _tldextract = tldextract.extract(data['url'])
            tup = (ts,
                   data['url'],
                   _tldextract.suffix,
                   data['length'],
                   data['offset'],
                   data['filename'],
                   language              
                )
            return tup
        except:
            return ()

def list_multiprocessing(param_lst,
                         func,
                         **kwargs):
    
    workers = kwargs.pop('workers')

    with Pool(workers) as p:
        apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)]
        result = list(tqdm(p.imap(_apply_lst, apply_lst), total=len(apply_lst)))

    # lists do not need such sorting, but this can be useful later
    result=sorted(result,key=lambda x:x[0])
    return [_[1] for _ in result]


def _apply_lst(args):
    params, func, num, kwargs = args
    return num, func(*params,**kwargs)  

In [7]:
# def process_index_file(file_name):
#     print('Unzipping index file ... ')
    
# #     df_name = file_name.replace('.gz','.feather')
#     df_name = file_name.split('/')[-1].replace('.gz', '.feather')    
#     file_unzipped = file_name.split('.gz')[0]

#     with gzip.open(file_name, 'rb') as f_in:
#         with open(file_unzipped, 'wb') as f_out:
#             shutil.copyfileobj(f_in, f_out)

#     lines = read_every_line(file_unzipped, 1e8)

#     print('{} lines extracted'.format(len(lines)))
    
#     print('Pre-processing index lines ... ')
#     out = list_multiprocessing(lines,
#                                process_index_file_line,
#                                workers=8)
    
#     # filter our blank lines
#     out =  [_ for _ in out if _ != ()]

#     print('Index pre-processed ... ')

#     print('Processing index dataframe ... ')

#     ts_list       = [_[0] for _ in out]
#     url_list      = [_[1] for _ in out]
#     tld           = [_[2] for _ in out]
#     length_list   = [_[3] for _ in out]
#     offset_list   = [_[4] for _ in out]
#     warc_list     = [_[5] for _ in out]
#     language_list = [_[6] for _ in out]

#     cols = ['ts','url','tld','length','offset','warc','language']
#     df = pd.DataFrame(data={
#         'ts':ts_list,
#         'url':url_list,
#         'tld':tld,
#         'length':length_list,
#         'offset':offset_list,
#         'warc':warc_list,
#         'language':language_list}
#                       ,columns=cols)

#     df = df[df.language=='rus']
#     df['wet'] = df.warc.apply(lambda x: x.replace('/warc/','/wet/').replace('.warc.','.warc.wet.'))
#     df['wet'] = df['wet'].apply(lambda x: file_prefix + x)

#     print('Index dataframe is ready ... ')
    
#     os.remove(file_name) 
#     os.remove(file_unzipped) 

#     print('Files removed ... ')
    
#     df = df.dropna().drop_duplicates().reset_index(drop=True)
#     df.to_feather(df_name)
    
#     print('Df saved ... ')

In [None]:
process_index_file(first_file)

Unzipping index file ... 
302 lines extracted
Pre-processing index lines ... 


  0%|                                                                                          | 0/302 [00:00<?, ?it/s]

Don't use Pool

In [13]:
def process_index_file(file_name):
    '''Will currently only process first line'''
    print('Unzipping index file ... ')
    
#     df_name = file_name.replace('.gz','.feather')
    df_name = file_name.split('/')[-1].replace('.gz', '.feather')    
    file_unzipped = file_name.split('.gz')[0]

    with gzip.open(file_name, 'rb') as f_in:
        with open(file_unzipped, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

    lines = read_every_line(file_unzipped, 1e8)

    print('{} lines extracted'.format(len(lines)))
    
    print('Pre-processing index lines ... ')
#     out = list_multiprocessing(lines,
#                                process_index_file_line,
#                                workers=8)
    
    out = process_index_file_line(lines[0])
    
    # filter our blank lines
    out =  [_ for _ in out if _ != ()]

    print('Index pre-processed ... ')

    print('Processing index dataframe ... ')

    ts_list       = [_[0] for _ in out]
    url_list      = [_[1] for _ in out]
    tld           = [_[2] for _ in out]
    length_list   = [_[3] for _ in out]
    offset_list   = [_[4] for _ in out]
    warc_list     = [_[5] for _ in out]
    language_list = [_[6] for _ in out]

    cols = ['ts','url','tld','length','offset','warc','language']
    df = pd.DataFrame(data={
                            'ts':ts_list,
                            'url':url_list,
                            'tld':tld,
                            'length':length_list,
                            'offset':offset_list,
                            'warc':warc_list,
                            'language':language_list}
                      ,columns=cols)

#     df = df[df.language=='rus']
    df['wet'] = df.warc.apply(lambda x: x.replace('/warc/','/wet/').replace('.warc.','.warc.wet.'))
    df['wet'] = df['wet'].apply(lambda x: file_prefix + x)

    print('Index dataframe is ready ... ')
    
#     os.remove(file_name) 
#     os.remove(file_unzipped) 
#     print('Files removed ... ')
    
    df = df.dropna().drop_duplicates().reset_index(drop=True)
    df.to_feather(df_name)
    
    print('Df saved ... ')

In [14]:
first_file = cc_indexes[1]

In [15]:
first_file

'../data/index_paths/cc-index.paths.gz'

In [16]:
process_index_file(first_file)

Unzipping index file ... 
302 lines extracted
Pre-processing index lines ... 
Index pre-processed ... 
Processing index dataframe ... 
Index dataframe is ready ... 
Df saved ... 


In [17]:
df = pd.read_feather('cc-index.paths.feather')

In [19]:
df

Unnamed: 0,ts,url,tld,length,offset,warc,language,wet


## Trouble-shooting

In [20]:
file_name = first_file
# df_name = file_name.replace('.gz','.feather')
file_unzipped = file_name.split('.gz')[0]

In [21]:
lines = read_every_line(file_unzipped, 1e8)
print('{} lines extracted'.format(len(lines)))

302 lines extracted


In [22]:
lines[0]

'cc-index/collections/CC-MAIN-2019-47/indexes/cdx-00000.gz\n'

In [23]:
lines[1]

'cc-index/collections/CC-MAIN-2019-47/indexes/cdx-00001.gz\n'

In [24]:
    # cc_indexes = [_.replace('\n','') for _ in cc_indexes]
lines = [line.replace('\n','') for line in lines]

In [25]:
lines[0]

'cc-index/collections/CC-MAIN-2019-47/indexes/cdx-00000.gz'

In [26]:
lines[0].split('/')[-1]

'cdx-00000.gz'

In [33]:
cc_index = lines[0]
cc_index_file = cc_index.split('/')[-1]

In [27]:
file_dict = collections.OrderedDict()

In [30]:
file_prefix = 'https://commoncrawl.s3.amazonaws.com/'

In [34]:
file_dict[os.path.join(storage_folder, cc_index_file)] = file_prefix + cc_index

In [35]:
file_dict

OrderedDict([('../data/index_paths/cdx-00000.gz',
              'https://commoncrawl.s3.amazonaws.com/cc-index/collections/CC-MAIN-2019-47/indexes/cdx-00000.gz')])

In [38]:
file_dict = collections.OrderedDict()

# iterate over the index files
for i, cc_index in enumerate(lines[:10]):
#     if i>75:
    cc_index_file = cc_index.split('/')[-1]
    file_dict[os.path.join(storage_folder, cc_index_file)] = file_prefix + cc_index
#     else:
#         pass 

In [39]:
def process_index_file(file_name):
    print('Unzipping index file ... ')
    
    df_name = file_name.replace('.gz','.feather')
    file_unzipped = file_name.split('.gz')[0]

    with gzip.open(file_name, 'rb') as f_in:
        with open(file_unzipped, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

    lines = read_every_line(file_unzipped,
                            1e8)

    print('{} lines extracted'.format(len(lines)))
    
    print('Pre-processing index lines ... ')
    out = list_multiprocessing(lines,
                               process_index_file_line,
                               workers=8)
    
    # filter our blank lines
    out =  [_ for _ in out if _ != ()]

    print('Index pre-processed ... ')

    print('Processing index dataframe ... ')

    ts_list       = [_[0] for _ in out]
    url_list      = [_[1] for _ in out]
    tld           = [_[2] for _ in out]
    length_list   = [_[3] for _ in out]
    offset_list   = [_[4] for _ in out]
    warc_list     = [_[5] for _ in out]
    language_list = [_[6] for _ in out]

    cols = ['ts','url','tld','length','offset','warc','language']
    df = pd.DataFrame(data={
        'ts':ts_list,
        'url':url_list,
        'tld':tld,
        'length':length_list,
        'offset':offset_list,
        'warc':warc_list,
        'language':language_list}
                      ,columns=cols)

#     df = df[df.language=='rus']
    df['wet'] = df.warc.apply(lambda x: x.replace('/warc/','/wet/').replace('.warc.','.warc.wet.'))
    df['wet'] = df['wet'].apply(lambda x: file_prefix + x)

    print('Index dataframe is ready ... ')
    
    os.remove(file_name) 
    os.remove(file_unzipped) 

    print('Files removed ... ')
    
    df = df.dropna().drop_duplicates().reset_index(drop=True)
    df.to_feather(df_name)
    
    print('Df saved ... ')        

In [None]:
for i,(file_name, url) in enumerate(tqdm(file_dict.items())):
    print('PROCESSING INDEX FILE [{}]/[{}] ...'.format(i, len(file_dict)))
    print('Downloading an index file {} ...'.format(file_name))
    save(url, file_name)
    process_index_file(file_name)
    gc.collect()
    # print(i,(file_name,url))
    print('Downloaded!')

  0%|                                                                                           | 0/10 [00:00<?, ?it/s]

PROCESSING INDEX FILE [0]/[10] ...
Downloading an index file ../data/index_paths/cdx-00000.gz ...
...100%, 659 MB, 3094 KB/s, 218 seconds passedUnzipping index file ... 
9815044 lines extracted
Pre-processing index lines ... 



  0%|                                                                                      | 0/9815044 [00:00<?, ?it/s]

In [15]:
z

In [16]:
file_unzipped

'../data/index_paths/cc-index.paths'

In [17]:
df_name

'../data/index_paths/cc-index.paths.feather'

In [18]:
df_name = file_name.split('/')[-1].replace('.gz', '.feather')

In [19]:
df_name

'cc-index.paths.feather'

In [8]:
with gzip.open(file_name, 'rb') as f_in:
    with open(file_unzipped, 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

FileNotFoundError: [Errno 2] No such file or directory: '../data/index_paths/cc-index.paths'