In [11]:
from google.cloud import storage,bigquery
import json
import pandas as pd
import datetime
import numpy as np
import gcsfs
from pandas.io import gbq

# Data transformation functions

In [2]:
def fix_dict(dict_,date):
    dict_['date'] = date
    
    if dict_['longitude']=='null':
        dict_['longitude'] = None
        
    if dict_['latitude']=='null':
        dict_['latitude'] = None
    
    if dict_['location']=='null':
        dict_['location'] = None

    dict_['timestamp'] = dict_['timestamp'][:-3] # removing "+00" to match standard timestmap format
    
    dict_.pop('unique_key', None)
    dict_ = {k.capitalize():v for k,v in dict_.items()} #capitalizing first letters to match client's desired schema
    return dict_

# Creating a connection
Initializing google cloud storage client and creating a connection to the desired bucket.

In [3]:
%%capture # every connection with GCP will include this line to suppress a warning regarding authentication which does not affect our code 
storage_client = storage.Client()
bucket = storage_client.get_bucket('your_bucket')
gcs_file_system = gcsfs.GCSFileSystem(project="your_project")

# Data
Taking a first look at a random data point to understand our data structure.

In [4]:
blob_example = bucket.blob(r'2015-02-20/incidents.ndjson') # random date
data_json_example = json.loads(blob_example.download_as_string(client=None))

In [5]:
data_json_example

{'unique_key': 2016530815,
 'descript': 'THEFT',
 'time': '12:00:00',
 'address': '700 BLOCK RIO GRANDE ST',
 'longitude': 'null',
 'latitude': 'null',
 'location': 'null',
 'timestamp': '2015-02-20 18:00:00+00'}

# Creating the GBQ Table

In [6]:
%%capture
PROJECT_ID = "your_project"
DATASET_ID = "your_dataset"
TABLE_ID = "your_table"

client = bigquery.Client()

# 1) create table
schema = [
            bigquery.SchemaField("Descript", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("Date", "DATE", mode="REQUIRED"),
            bigquery.SchemaField("Time", "TIME", mode="REQUIRED"),
            bigquery.SchemaField("Address", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("Longitude", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("Latitude", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("Location", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("Timestamp", "STRING", mode="REQUIRED")
         ]

table = bigquery.Table(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}", schema=schema)
table = client.create_table(table)

# Function creation
Now we will define the function responsible for taking a date string as an input, retrieving the information related to that particular date, and uploading this information into our Google Big Query table.

In [7]:
def load_data(date_string):
    global schema,table,client,gcs_file_system,cont
    rows_to_insert = []
    try:
        datetime.datetime.strptime(date_string, '%Y-%m-%d')
    except ValueError:
        raise ValueError("Incorrect date format,input should be YYYY-MM-DD")
        
    gcs_json_path = "gs://your_bucket/{}/incidents.ndjson".format(date_string)
    
    with gcs_file_system.open(gcs_json_path) as f:
        json_data = [json.loads(line) for line in f] # for accessing jsons with multiple dictionaries inside
    
    for single_json in json_data:
        single_json = fix_dict(single_json,date_string)
        rows_to_insert.append(single_json)
        cont = cont+1 # will be used to double check the amount of rows
        
    client.insert_rows(rows = rows_to_insert,table = table,selected_fields = schema)
    return cont

# Uploading the data to GBQ
And saving it in a dataframe.

In [8]:
blobs = storage_client.list_blobs(bucket)    
dates = [blob.name[0:10] for blob in blobs]

In [9]:
cont = 0
for date in dates:
    load_data(date)
    
print(cont) # checking the amount of rows

45122


In [19]:
%%capture
full_table_query = "SELECT * FROM `your_project.your_dataset.your_table`"
df = gbq.read_gbq(full_table_query,project_id = 'your_project') # saving the full table in a dataframe

In [20]:
df

Unnamed: 0,Descript,Date,Time,Address,Longitude,Latitude,Location,Timestamp
0,HARASSMENT,2015-11-24,12:00:00,1500 BLOCK GARDEN ST,,,,2015-11-24 18:00:00
1,DEBIT CARD ABUSE,2015-12-19,12:00:00,2200 BLOCK LAWNMONT AVE,,,,2015-12-19 18:00:00
2,THEFT OF SERVICE,2015-10-26,12:00:00,1000 BLOCK NORWOOD PARK BLVD,,,,2015-10-26 18:00:00
3,THEFT,2015-12-30,21:07:00,2500 BLOCK W ANDERSON LN,,,,2015-12-31 03:07:00
4,LOST PROP,2015-06-29,00:00:00,12500 BLOCK TREE LINE DR,,,,2015-06-29 06:00:00
...,...,...,...,...,...,...,...,...
45117,POSS CONTROLLED SUB/OTHER,2016-04-04,22:40:00,500 BLOCK E 7TH ST,,,,2016-04-05 04:40:00
45118,POSS CONTROLLED SUB/NARCOTIC,2016-04-04,21:42:00,1400 BLOCK WEBBERVILLE RD,,,,2016-04-05 03:42:00
45119,CRIMINAL TRESPASS NOTICE,2016-04-04,10:25:00,200 BLOCK E 6TH ST,,,,2016-04-04 16:25:00
45120,CRIMINAL MISCHIEF,2016-04-04,23:28:00,2100 BLOCK NOGALES TRL,,,,2016-04-05 05:28:00


# Demonstrational SQL Queries
All of the next filters could be easily done using the Pandas library. The following queries are for demonstrational purposes only.

In [None]:

# counting descripts 
descript_value_counts_query = """SELECT Descript,COUNT(Descript) AS value_counts 
                                 FROM `your_project.your_dataset.your_table`
                                 GROUP BY (Descript) 
                                 ORDER BY  value_counts DESC"""

# returning data containing geographical information
geo_data_query = """SELECT *
                    FROM `your_project.your_dataset.your_table`
                    WHERE Longitude is not null"""

# counting crimes per month. (With BigQuery, you cannot use the derived column within the same select,
# hence the subquery)
crimes_per_month_query = """SELECT year_month,COUNT(year_month) AS value_counts
                            FROM (
                                SELECT LEFT(Timestamp,7) AS year_month
                                FROM `your_project.your_dataset.your_table`
                                )
                            GROUP BY year_month
                            ORDER BY value_counts DESC
                            """