<IMG SRC="https://github.com/jacquesroy/byte-size-data-science/raw/master/images/Banner.png" ALT="BSDS Banner" WIDTH=1195 HEIGHT=200>

## Accessing Socrata Open Data API

### 020-Socrata Datasets
Execute the next cell if you want to see the `Byte Size Data Science` youtube channel video

In [None]:
from IPython.display import IFrame

IFrame(src="https://www.youtube.com/embed/4C9ShcU--ek?rel=0&amp;controls=0&amp;showinfo=0", width=560, height=315)


In [None]:
# Library used to read datasets
!pip install sodapy

In [None]:
# Libraries needed in the notebook
import urllib3, requests, json


## Searching the Catalog
Search for specific datasets and put the result in a Spark dataframe

For this example, we search the city of Chicago for transportation datasets.

The data coming back is a json document that is quite elaborate. Spark does not seem to be able to hande it properly so we create a schema for it to help in the conversion.

In [None]:
from pyspark.sql.types import *

classification = StructType([StructField('categories', ArrayType(StringType()), True),
                            StructField('domain_category',StringType(), True),
                            StructField('domain_metadata',ArrayType(MapType(StringType(), StringType()), True),True),
                            StructField('domain_tags',ArrayType(StringType()), True),
                            StructField('tags',ArrayType(StringType()), True)
                            ])

metadata = StructType([StructField('domain',StringType(), True),
                       StructField('license',StringType(), True)
                      ])

owner = StructType([StructField('display_name',StringType(), True),
                    StructField('id',StringType(), True)
                   ])

resource = StructType([StructField('attribution',StringType(), True),
                       StructField('columns_datatype',ArrayType(StringType()), True),
                       StructField('columns_description',ArrayType(StringType()), True),
                       StructField('columns_field_name',ArrayType(StringType()), True),
                       StructField('columns_format',ArrayType(StringType()), True),
                       StructField('columns_name',ArrayType(StringType()), True),
                       StructField('createdAt',StringType(), True),
                       StructField('description',StringType(), True),
                       StructField('download_count',LongType(), True),
                       StructField('id',StringType(), True),
                       StructField('name',StringType(), True),
                       StructField('page_views',MapType(StringType(), StringType()), True),
                       StructField('parent_fxf',StringType(), True),
                       StructField('provenance',StringType(), True),
                       StructField('type',StringType(), True),
                       StructField('updatedAt',StringType(), True)
                          ])

# The schema uses the above structures
catalogSchema = StructType([StructField('classification', classification, True),
                            StructField('link',StringType(), True),
                            StructField('metadata', metadata, True),
                            StructField('owner', owner,True),
                            StructField('permalink',StringType(), True),
                            # StructField('published_copy',published_copy, True),
                            StructField('resource', resource, True)
                           ])


In [None]:
# By default, it returns 100 records.
# We can get more by using pagination parameters: offset and limit (up to 10000 records)
# If there are more, we have to use the scroll_id parameter
# the ID of the last result in the previously fetched chunk of results.

# In this example we should never go above a few hundreds, probably a lot less.
# We still act as if we get lots of returned values

url="http://api.us.socrata.com/api/catalog/v1"
# Retrieve only datasets
urldatasets = url + "?only=dataset&domains=data.cityofchicago.org" + \
                    "&search_context=data.cityofchicago.org" + \
                    "&categories=Transportation"
offset=0
limit=10000
limit2=100
done = 0
scroll_id=""
all_records = dict(results=[])
while (done == 0) :
    page = "&scroll_id=" + scroll_id + "&limit=" + str(limit)
    # print(url + page)
    response = requests.get(urldatasets + page)
    if response.status_code != 200 :
        print(response.status_code)
        done = 1
        break
    if (offset == 0) :
        jsondoc = json.loads(response.text)
        scroll_id = jsondoc['results'][len(jsondoc['results']) - 1]['resource']['id']
        max_records = jsondoc['resultSetSize']
        for val in jsondoc['results'] :
            all_records['results'].append(val)
    else :
        jsondoc = json.loads(response.text)
        scroll_id = jsondoc['results'][len(jsondoc['results']) - 1]['resource']['id']
        for val in jsondoc['results'] :
            all_records['results'].append(val)

    offset += limit
    if (offset >= max_records) :
        done = 1

catalog_df = spark.createDataFrame(all_records['results'], schema=catalogSchema )
catalog_df.createOrReplaceTempView('socrataCatalog')

In [None]:
# Get a quick summary of the records
spark.sql("""
      select metadata.domain,resource.id, substring(resource.updatedAt, 0, 10) updatedAt, trim(resource.name) name
      from socrataCatalog
      """).show(n=40,truncate=70,vertical=False)

## Reading a dataset
Since we have a URL, we should be able to read the dataset using the Requests method.

Socrates provides the SODA API (Socrates Open Data Api) to read the data.<br/>
The documentation is available at: https://github.com/xmunoz/sodapy

See also: https://dev.socrata.com/consumers/getting-started.html

The SODA API limits the number of records returned to 50000.
you can use the offset and limit parameters to page through the records and read them all.

The offset could potentially be used to read new records to update a repository.

In [None]:
# Library used to read datasets
from sodapy import Socrata

## Dataset metadata
The API allows us to retrieve a dataset metadata. Looking at the metadata, we find statistics for every column of the dataset. Since it has both a non-null value count and a null value count, we can add both and know how many records there are in the dataset.

In [None]:
# Unauthenticated client only works with public data sets. Note 'None'
# in place of application token, and no username or password:
client = Socrata("data.cityofchicago.org", None)

In [None]:
# We get statistics on the taxi trips by using its ID
tripsMetadata = client.get_metadata("wrvz-psew")

# number of records in the dataset
taxiRecords = int(tripsMetadata['columns'][0]['cachedContents']['not_null']) + \
              int(tripsMetadata['columns'][0]['cachedContents']['null'])
print("Taxi records: " + str(taxiRecords))

In [None]:
for col in tripsMetadata['columns'] :
    print(col['fieldName'] + ": " + col['dataTypeName'])

In [None]:
# We can't create a schema based on the metadata since the fields are all strings
# If we could, we could do it this way:
taxiSchema = StructType()
for col in tripsMetadata['columns'] :
    if (col['dataTypeName'] == 'number' or col['dataTypeName'] == 'money') :
        taxiSchema.add(StructField(col['fieldName'], FloatType(), True))
    else :
        taxiSchema.add(StructField(col['fieldName'], StringType(), True))


## Large datasets
In some cases, we could run into some memory issues if we collect all the records before creating a dataframe. It is possible to create multiple dataframes and merge them together using the `union` method.

For our purpose here, we only get a small subset of records.

In [None]:
from pyspark.sql.types import *

# Results returned as JSON from API / converted to Python list of dictionaries by sodapy.
# Getting the last 1000 records times out!

result_list=[]
#offset = taxiRecords - 1000
offset = 0

# results = client.get("wrvz-psew", offset=offset, limit=1000)   # a large offset causes a timeout
# results = client.get("wrvz-psew", where="trip_start_timestamp > '2017-07'", limit=1000)   # Taxi trips
results = client.get("wrvz-psew",where="trip_start_timestamp > '2017-07'",order="trip_start_timestamp DESC",limit=1000)   # Taxi trips
for res in results :
    result_list.append(res)

# Create a schema for the taxi data
taxiSchema = StructType()
for key in result_list[0].keys() :
    taxiSchema.add(StructField(key, StringType(), True))

# Convert to Spark DataFrame
results_df = spark.createDataFrame(result_list,taxiSchema)
results_df.createOrReplaceTempView('datasetTable')


In [None]:
spark.sql("""
  select * from datasetTable
  order by trip_start_timestamp desc
  limit(10)
""").take(1)

## Casting columns
Most columns don't have the proper type.

We could have looked at the data first and created a schema accordingly.

In [None]:
results_df.printSchema()

In [None]:
# Convert columns to the proper types
# This would make it easier for queries
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date
from pyspark.sql.functions import column

results2_df = results_df.withColumn("dropoff_centroid_longitude", column("dropoff_centroid_longitude").cast("double")).\
                        withColumn("pickup_community_area", column("pickup_community_area").cast("int")).\
                        withColumn("dropoff_community_area", column("dropoff_community_area").cast("int")).\
                        withColumn("trip_start_timestamp", to_timestamp("trip_start_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS")).\
                        withColumn("trip_seconds", column("trip_seconds").cast("int")).\
                        withColumn("dropoff_centroid_latitude", column("dropoff_centroid_latitude").cast("double")).\
                        withColumn("pickup_centroid_longitude", column("pickup_centroid_longitude").cast("double")).\
                        withColumn("tips", column("tips").cast("double")).\
                        withColumn("tolls", column("tolls").cast("double")).\
                        withColumn("trip_end_timestamp", to_timestamp("trip_end_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS")).\
                        withColumn("trip_miles", column("trip_miles").cast("double")).\
                        withColumn("pickup_centroid_latitude", column("pickup_centroid_latitude").cast("double")).\
                        withColumn("fare", column("fare").cast("double")).\
                        withColumn("trip_total", column("trip_total").cast("double"))
# Show one row to make sure it converted properly. The tricky ones are the timestamps
results2_df.show(1)

In [None]:
results2_df.printSchema()