<a href="https://colab.research.google.com/github/ivankozlovcodes/aivscovid19-colabs/blob/master/data_processing/springer_preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preprocessing for springer dataset

Location on the bucket: gs://middleware-bucket/springer/
Description: csv files 10000 records each. Approx 1000000 articles


NOTES:

BigQuery schema reference:
* Title
* Authors
* DOI (Digital Object Identifier)
* Abstract
* Date
* Full body
* Source(what journal we got it from)
* Source impact factor (tiers)
* Search keyword (used to find this document)
* Category (genomics for example)
* Licencing 
* Date of the document acquisition
* Quantity of citations
* Organization affiliated
* Keywords
* References
* Link
* Extra link


In [0]:
!pip install apache_beam[gcp] --quiet

[K     |████████████████████████████████| 3.4MB 2.8MB/s 
[K     |████████████████████████████████| 61kB 9.0MB/s 
[K     |████████████████████████████████| 225kB 29.9MB/s 
[K     |████████████████████████████████| 59.2MB 62kB/s 
[K     |████████████████████████████████| 1.2MB 45.1MB/s 
[K     |████████████████████████████████| 81kB 10.4MB/s 
[K     |████████████████████████████████| 51kB 7.5MB/s 
[K     |████████████████████████████████| 122kB 51.4MB/s 
[K     |████████████████████████████████| 174kB 47.1MB/s 
[K     |████████████████████████████████| 92kB 9.9MB/s 
[K     |████████████████████████████████| 235kB 40.5MB/s 
[K     |████████████████████████████████| 143kB 57.7MB/s 
[K     |████████████████████████████████| 112kB 61.6MB/s 
[?25h  Building wheel for httplib2 (setup.py) ... [?25l[?25hdone
  Building wheel for avro-python3 (setup.py) ... [?25l[?25hdone
  Building wheel for oauth2client (setup.py) ... [?25l[?25hdone
  Building wheel for hdfs (setup.py) ... 

In [0]:
from google.colab import auth

auth.authenticate_user()

### Csv header

In [0]:
!gsutil cat gs://middleware-bucket/springer/1-10001records.csv | head -1

html_url,pdf_url,title,name,creators,editors,open_access,doi,type,date,abstract


### BigQuery connection
See here: https://beam.apache.org/documentation/io/built-in/google-bigquery/

In [0]:
%%writefile pipeline.py
import io
import sys
import csv
from datetime import datetime
from collections import namedtuple

import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions

# table_spec = 'ai-vs-covid19:BigBioMedBERT2.springer_com_use'
# TODO(ivan): remove
table_spec_open = 'ai-vs-covid19:test.springer_comm_use'
table_spec_no_open = 'ai-vs-covid19:test.springer_non_comm_use'
table_schema = 'title:STRING, type:STRING, link:STRING, extra_link:STRING, authors:STRING, editors:STRING, abstract:STRING, doi:STRING, date:DATE, keywords:STRING, aquisition_date:DATE'

# input_pattern = 'gs://middleware-bucket/springer/*.csv'
# TODO(ivan): remove
# input_pattern = 'gs://middleware-bucket/springer/10*'
input_pattern = 'gs://middleware-bucket/springer/1-10001records.csv'
# input_pattern = 'asd.csv'

class SplitData(beam.DoFn):
  OPEN_ACCESS='OPEN_ACCESS'
  NO_OPEN_ACCESS='NO_OPEN_ACCESS'

  def process(self, element):
    if element['open_access'] == 'false':
      yield beam.pvalue.TaggedOutput(self.NO_OPEN_ACCESS, element)
    else:
      yield beam.pvalue.TaggedOutput(self.OPEN_ACCESS, element)

def rebuild_authors_list(authors):
  authors_list = authors.split(';')
  new_authors_list = [' '.join(reversed(a.split(','))).strip() for a in authors_list]
  return ';'.join(new_authors_list)

def clean_abstract(abstract):
  lines = []
  for line in abstract.split('\n'):
    line = line.strip()
    if len(line) > 0:
      lines.append(line)
  return '\n'.join(lines)

def rebuild_dictionary(csv_row_dict):
  abstract = clean_abstract(csv_row_dict['abstract'])
  return {
      'title': csv_row_dict['title'],
      'type': csv_row_dict['type'],
      'link': csv_row_dict['html_url'],
      'extra_link': csv_row_dict['pdf_url'],
      'authors':  rebuild_authors_list(csv_row_dict['creators']),
      'editors': rebuild_authors_list(csv_row_dict['editors']),
      'abstract': abstract,
      'doi': csv_row_dict['doi'],
      'date': csv_row_dict['date'],
      'keywords': 'biochemistry',
      'aquisition_date': '2020-3-16',
  }

def get_csv_reader(readable_file):
  return csv.DictReader(io.TextIOWrapper(readable_file.open()))

options = PipelineOptions(flags=sys.argv)
options.view_as(SetupOptions).save_main_session = True

with beam.Pipeline(options=options) as p:
# with beam.Pipeline() as p:
  data = (
    p
    | beam.io.fileio.MatchFiles(input_pattern)
    | beam.io.fileio.ReadMatches()
    | beam.Reshuffle()
    | beam.FlatMap(get_csv_reader)
    | beam.ParDo(SplitData()).with_outputs(
        SplitData.OPEN_ACCESS,
        SplitData.NO_OPEN_ACCESS)
  )

  (
    data.OPEN_ACCESS
    | 'open_access' >> beam.Map(rebuild_dictionary)
    | 'open_access output' >> beam.io.WriteToBigQuery(
      table_spec_open,
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  )
  (
    data.NO_OPEN_ACCESS
    | 'no_open_access' >> beam.Map(rebuild_dictionary)
    | 'no_open_access output' >> beam.io.WriteToBigQuery(
      table_spec_no_open,
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  )
  p.run().wait_until_finish()

Overwriting pipeline.py


In [0]:
from google.colab import auth
auth.authenticate_user()

In [0]:
%env PROJECT=ai-vs-covid19
%env BUCKET=apache-dataflow-temp

# --input gs://middleware-bucket/springer/1-10001records.cs \
!python pipeline.py \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp

env: PROJECT=ai-vs-covid19
env: BUCKET=apache-dataflow-temp
Namespace()
['--runner', 'DataflowRunner', '--project', 'ai-vs-covid19', '--temp_location', 'gs://apache-dataflow-temp/tmp']
  | beam.ParDo(SplitData()).with_outputs(
  | MatchAll())
  | beam.ParDo(SplitData()).with_outputs(
  kms_key=transform.kms_key))
