### OTP Evidence Source Import

In [8]:
import fsspec
import pandas as pd
import os.path as osp
from data_source import catalog
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()

In [5]:
otp_version = '20.04'
url_prefix = f'https://storage.googleapis.com/open-targets-data-releases/{otp_version}/input/evidence-files'
files = [
    dict(source='eva', url=url_prefix + '/eva-2020-03-20.json.gz'),
    dict(source='l2g', url=url_prefix + '/genetics-portal-evidences-2020-03-12.json.gz')
]

In [13]:
%%time
for f in files:
    # Download to local filesystem
    of = fsspec.open(f['url'])
    path = '/tmp/' + f['url'].split('/')[-1]
    print(f'Downloading url {f["url"]} to path {path}')
    of.fs.download(f['url'], path)
    f['path'] = path

Downloading url "https://storage.googleapis.com/open-targets-data-releases/20.04/input/evidence-files/eva-2020-03-20.json.gz" to path "/tmp/eva-2020-03-20.json.gz"
Downloading url "https://storage.googleapis.com/open-targets-data-releases/20.04/input/evidence-files/genetics-portal-evidences-2020-03-12.json.gz" to path "/tmp/genetics-portal-evidences-2020-03-12.json.gz"


In [14]:
!du -sh /tmp/*.json.gz

8.3M	/tmp/eva-2020-03-20.json.gz
87M	/tmp/genetics-portal-evidences-2020-03-12.json.gz


In [52]:
%%time
for f in files:
    # Convert from json to parquet
    path = f['path'].replace('.json.gz', '.parquet')
    print(f'Converting {f["path"]} to {path}')
    df = spark.read.json(f['path'])
    f['schema'] = df._jdf.schema().treeString()
    f['rows'] = df.count()
    df.write.parquet(path)
    f['result'] = path

Converting /tmp/eva-2020-03-20.json.gz to /tmp/eva-2020-03-20.parquet
Converting /tmp/genetics-portal-evidences-2020-03-12.json.gz to /tmp/genetics-portal-evidences-2020-03-12.parquet
CPU times: user 28.4 ms, sys: 1.82 ms, total: 30.3 ms
Wall time: 1min 50s


In [53]:
!du -sh /tmp/*.parquet

7.1M	/tmp/eva-2020-03-20.parquet
48M	/tmp/genetics-portal-evidences-2020-03-12.parquet


In [55]:
%%time
for f in files:
    # Upload to GCS
    f['entry'] = catalog.create_entry(
        source='otpev',
        slug=f['source'],
        version='v'+otp_version,
        metadata=dict(schema=f['schema'], nrow=f['rows']),
        format='parquet',
        type='directory'
    )
    
    url = f['entry'].url()
    print(f'Uploading {f["result"]} to {url}')
    f['entry'].fs.upload(f['result'], url, recursive=True)

Uploading /tmp/eva-2020-03-20.parquet to gs://public-data-source/catalog/otpev/eva/v20.04/20200610T000000/data.parquet
Uploading /tmp/genetics-portal-evidences-2020-03-12.parquet to gs://public-data-source/catalog/otpev/l2g/v20.04/20200610T000000/data.parquet
CPU times: user 358 ms, sys: 99 ms, total: 457 ms
Wall time: 1min 30s


In [56]:
for f in files:
    # Add to global catalog
    catalog.add_entry(f['entry'])

In [57]:
catalog.load().to_pandas().query('source_slug == "otpev"')

Unnamed: 0,source_slug,artifact_slug,artifact_version,artifact_created,artifact_formats,storage_slug,storage_scheme,storage_bucket,storage_root,storage_project,artifact_metadata_nrow,artifact_metadata_schema,entry
1,otpev,eva,v20.04,2020-06-10,"[{'name': 'parquet', 'type': 'directory', 'def...",gcs,gs,public-data-source,catalog,target-ranking,122219.0,root\n |-- access_level: string (nullable = tr...,"source=Source(slug='otpev', name=None, descrip..."
2,otpev,l2g,v20.04,2020-06-10,"[{'name': 'parquet', 'type': 'directory', 'def...",gcs,gs,public-data-source,catalog,target-ranking,1931303.0,root\n |-- access_level: string (nullable = tr...,"source=Source(slug='otpev', name=None, descrip..."
