## Create PostgreSQL database and table

In [28]:
import psycopg2
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres")
cur = conn.cursor()

In [29]:
table_name = "bq_all"

cur.execute("""
CREATE TABLE {}(
    id integer PRIMARY KEY,
    author text,
    title text,
    score integer,
    story_time timestamp,
    url text,
    commenters text,
    all_text text
)
""".format(table_name))

conn.commit()

## Create and export BigQuery table to Google Cloud Storage as .csv files

In [3]:
from google.cloud import storage
from google.cloud import bigquery
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/Users/jasminetan/Desktop/jt-dsi-capstone-5c0b7ccac053.json"

project = "jt-dsi-capstone"
dataset_id = 'hacker_news'
table_id = 'all'
bucket_name = 'jt-dsi-capstone'
file_name = 'all.csv*'

query_str = """
SELECT
  c.id,
  c.author,
  c.title,
  c.score,
  c.story_time,
  c.url,
  CONCAT(c.author, ', ', c.commenters, ', ') AS commenters,
  CONCAT(c.title, '. ', c.story_text, '. ', c.comments) AS all_text
FROM (
  SELECT
    ANY_VALUE(b.id) AS id,
    ANY_VALUE(b.author) AS author,
    ANY_VALUE(b.title) AS title,
    ANY_VALUE(b.score) AS score,
    ANY_VALUE(b.story_time) AS story_time,
    ANY_VALUE(b.url) AS url,
    ANY_VALUE(b.story_text) AS story_text,
    STRING_AGG(DISTINCT commenter, ', ') AS commenters,
    STRING_AGG(DISTINCT comment, '. ') AS comments
  FROM ((
      SELECT
        DISTINCT a.id,
        a.author,
        a.title,
        a.score,
        a.story_time,
        a.url,
        a.story_text,
        a.commenter1 AS commenter,
        a.comment1 AS comment
      FROM (
        SELECT
          s.id,
          s.author,
          s.title,
          s.score,
          s.story_time,
          s.url,
          s.text AS story_text,
          p2.commenter AS commenter1,
          p2.text AS comment1,
          p1.commenter AS commenter2,
          p1.text AS comment2,
          p0.commenter AS commenter3,
          p0.text AS comment3
        FROM (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p0
        JOIN (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p1
        ON
          p1.id=p0.parent
        JOIN (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p2
        ON
          p2.id=p1.parent
        JOIN (
          SELECT
            id,
            `by` AS author,
            score,
            `timestamp` AS story_time,
            url,
            text,
            title
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'story'
            AND score > 10
            AND `timestamp` > '2011-12-31 23:59:00') s
        ON
          s.id=p2.parent) a)
    UNION ALL (
      SELECT
        DISTINCT a.id,
        a.author,
        a.title,
        a.score,
        a.story_time,
        a.url,
        a.story_text,
        a.commenter2 AS commenter,
        a.comment2 AS comment
      FROM (
        SELECT
          s.id,
          s.author,
          s.title,
          s.score,
          s.story_time,
          s.url,
          s.text AS story_text,
          p2.commenter AS commenter1,
          p2.text AS comment1,
          p1.commenter AS commenter2,
          p1.text AS comment2,
          p0.commenter AS commenter3,
          p0.text AS comment3
        FROM (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p0
        JOIN (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p1
        ON
          p1.id=p0.parent
        JOIN (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p2
        ON
          p2.id=p1.parent
        JOIN (
          SELECT
            id,
            `by` AS author,
            score,
            `timestamp` AS story_time,
            url,
            text,
            title
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'story'
            AND score > 10
            AND `timestamp` > '2011-12-31 23:59:00') s
        ON
          s.id=p2.parent) a)
    UNION ALL (
      SELECT
        DISTINCT a.id,
        a.author,
        a.title,
        a.score,
        a.story_time,
        a.url,
        a.story_text,
        a.commenter3 AS commenter,
        a.comment3 AS comment
      FROM (
        SELECT
          s.id,
          s.author,
          s.title,
          s.score,
          s.story_time,
          s.url,
          s.text AS story_text,
          p2.commenter AS commenter1,
          p2.text AS comment1,
          p1.commenter AS commenter2,
          p1.text AS comment2,
          p0.commenter AS commenter3,
          p0.text AS comment3
        FROM (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p0
        JOIN (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p1
        ON
          p1.id=p0.parent
        JOIN (
          SELECT
            id,
            `by` AS commenter,
            text,
            parent
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'comment') p2
        ON
          p2.id=p1.parent
        JOIN (
          SELECT
            id,
            `by` AS author,
            score,
            `timestamp` AS story_time,
            url,
            text,
            title
          FROM
            `bigquery-public-data.hacker_news.full`
          WHERE
            type LIKE 'story'
            AND score > 10
            AND `timestamp` > '2011-12-31 23:59:00') s
        ON
          s.id=p2.parent) a)) b
  GROUP BY
    b.id) c"""

In [4]:
bq_client = bigquery.Client()

job_config = bigquery.QueryJobConfig()

# Set the destination table. Here, dataset_id is a string, such as:
table_ref = bq_client.dataset(dataset_id).table(table_id)
job_config.destination = table_ref
job_config.use_legacy_sql = False
job_config.location = 'US'
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

# The write_disposition specifies the behavior when writing query results
# to a table that already exists. With WRITE_TRUNCATE, any existing rows
# in the table are overwritten by the query results.
# job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

# Start the query, passing in the extra configuration.
query_job = bq_client.query(
    query_str,
    # Location must match that of the dataset(s) referenced in the query
    # and of the destination table.
    job_config=job_config)  # API request - starts the query

query_job.result()
print("Query finished")

Query finished


In [5]:
destination_uri = 'gs://{}/{}'.format(bucket_name, file_name)
dataset_ref = bq_client.dataset(dataset_id, project=project)
table_ref = dataset_ref.table(table_id)

extract_job = bq_client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
    location='US')  # API request
extract_job.result()  # Waits for job to complete.

print('Exported {}:{}.{} to {}'.format(
    project, dataset_id, table_id, destination_uri))

Exported jt-dsi-capstone:hacker_news.all to gs://jt-dsi-capstone/all.csv*


## Read .csv files and store in PostgreSQL DB

In [7]:
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

blobs = bucket.list_blobs(prefix="all.csv")

blobs_list = [blob for blob in blobs]

file_indices = []

for index, each_blob in enumerate(blobs_list):
    each_blob.download_to_filename('all_{}.csv'.format(index))
    file_indices.append(index)
    print('all_{}.csv downloaded'.format(index))

In [39]:
import csv
import sys
csv.field_size_limit(sys.maxsize)

conn = psycopg2.connect("host=localhost dbname=postgres user=postgres")
cur = conn.cursor()

for index in file_indices:
    with open('all_{}.csv'.format(index), 'r') as f:
        reader = csv.reader(f)
        next(reader)  # Skip the header row.
        for row in reader:
            cur.execute(
                "INSERT INTO {} VALUES (%s, %s, %s, %s, %s, %s, %s, %s)".format(table_name),
                row
            )
    conn.commit()
    print('all_{}.csv loaded into {}'.format(index, table_name))

all_0.csv loaded into bq_all
all_1.csv loaded into bq_all
all_2.csv loaded into bq_all
all_3.csv loaded into bq_all
