In [None]:
import avro

### Avro Demo

Situation so far:

* Process streams of records using MapReduce (or MapReduce streaming)

* Input data is from either delimited text files or "free text" files in HDFS
  * Most data sets have a lot of structure -- they look more like rows in DB tables than like a flat sequence of fields

*  What is Structure / Schema Information?
  *  Fields have a type, types require validation
  *  Fields are optional or mandatory
  *  Aggregate fields:  arrays and records
  
* Fundamental operation performed by all mappers
  * Read the record
  * Parse into field on delimiter
  * Validate fields (data type, mandatory fields present)
  * Access fields to do calculation

* Limitations due to flat records
  * Fields are defined (only) by position
    * Not natural / readable, so error prone
  * Code to do record parsing must be repeated every place the record is used
    * Having the same code in multiple places is never good software engineering
    
  * Changes to the schema will tend to break code (everywhere)
    * Change field delimiter
    * Change field order
    * Add new fields
    * Change data type
    * Delete unneeded fields
  
Ideally:
* The data schema lives with the data or is centrally defined, not redundantly defined with the process code
* Processors can use an "object-like" representation for their code
  * More readable, not dependent on actual format of record
* Processors react "appropriately" in case the schema changes
  * No code change required in case of non-breaking change
 

#### Sample Data Set

Embellished version of the City table

Fields (in order):

1.  id
1.  name
1.  country_code
1.  district
1.  up to three neighborhoods
1.  mayor name
1.  year mayor elected

Schema requirements:

* id is required and must be long
* name is required and must be non-empty
* country_code is required and must be a known country code
* district is optional
* up to three neighborhoods, all optional
* mayor name is required but date elected is optional

In [None]:
country_codes = ["USA", "NLD", "AFG"]

def read_cities():
    cities = []
    with open("city.txt") as cityfile:
        for line in cityfile:
            cities.append(read_city(line))
    return cities
        
def read_city(line):
    id, name, country_code, \
        district, population, \
        n1, n2, n3, \
        mayor_name, year_elected = line.strip().split(",")
         
    # Check mandatory fields
    if (not id) or (not name) or (not mayor_name):
            raise Exception("Missing a field")
            
    # Check valid country code   
    if country_code not in country_codes:
        raise Exception(f"Bad country code {country_code}")
        
    # Parse and convert integer fields
    id = int(id)
    population = int(population)
        
    # Possibly three neighborhoods, but could be in 
    # any of the three input fields
    neighborhoods = []
    if (n1):
        neighborhoods.append(n1)
    if (n2):
        neighborhoods.append(n2)
    if (n3):
        neighborhoods.append(n3)
        
    # Mayor is a structure with mandatory name
    # and optional date elected
        
    mayor = {"name": mayor_name}
    if year_elected:
        mayor["year_elected"] = int(year_elected) if year_elected else None
    
    city = {"id": id, 
            "country_code": country_code,
            "name": name, 
            "district": district,
            "neighborhoods": neighborhoods,
            "mayor": mayor }
        
    return city

In [None]:
#  How does this related to MapReduce / HDFS?
json_cities = read_cities()
print(json_cities)

Now view the Avro schema for this record type.

In [None]:
import avro.schema

# First just parse the schema to see if the declaration is OK syntax
schema = avro.schema.Parse(open("cityv1.avsc", "rb").read())
print(schema)

In [None]:
# We already converted our data to JSON -- now we can write an Avro file with the same data

from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

json_cities = read_cities()
schema = avro.schema.Parse(open("cityv1.avsc", "rb").read())
with DataFileWriter(open("cities.avro", "wb"), DatumWriter(), schema) as writer:
    for record in json_cities:
        writer.append(record)

In [None]:
#  ??  What happens if the data doesn't conform to the schema??

In [None]:
# Now for the payoff!  Does our life get easier if the file is in Avro?
#Simpler, no splitting no parsing.

with DataFileReader(open("cities_v1.avro", "rb"), DatumReader()) as reader:
    for city in reader:
        print(city['name'] + ", " + \
              city['country_code'] + ", " + \
              city['mayor']['name'] + ", " + 
              str(len(city['neighborhoods'])))

### Back to MapReduce

Remember average population per city grouped by country code.  Here is the mapper and reducer.

```
#!/usr/bin/env python
"""mapper.py"""

import sys
import string

# Input records look like this:
#   1,Kabul,AFG,Kabol,1780000
# The third field is the country code, the fifth field is the population.
# We emit (code, population)
#
# Notice unlike word count, we are no emitting one record per word;
# we are just emitting one record per line. So the inner for loop goes away
#

for line in sys.stdin:
    fields = line.strip().split(",")
    country_code = fields[2]
    population = fields[4]
    print '%s\t%s' % (country_code, population)

#########################################################
#!/usr/bin/env python
import sys

# Input is tab-delimited tuples of the form (code, population)
# Taking the average population per code is exactly like average
# word length per word category

current_code = None
current_sum = 0
current_count = 0
code = None

for line in sys.stdin:
    line = line.strip()
    code, pop = line.split('\t', 1)

    if current_code == code:
        current_sum += float(pop)
        current_count += 1
    else:
        if current_code:
            print '%s\t%f' % (current_code, current_sum/current_count)
        current_code = code
        current_sum = float(pop)
        current_count = 1

if current_code == code:
    print '%s\t%f' % (current_code, current_sum / current_count)
```


How does the mapper and reducer change when we switch to reading from Avro rather than text?

### Schema Changes

The schema changes when 
1. We start buying data from a new provider that supplies different records
2. Upstream calculation starts supplying different information

In that case
* The "writer" of the Avro data changes from v1 to v2
* Do all of the "readers" of the Avro data have to adopt v2, or if they want, can they continue to process the data using v1 schema?

This makes a huge difference:  suppose you have *many* readers company-wide, only some of them care about the v2 data.
* Does the whole company need to simultaneously switch to v2?
* If so, there is so much pain to be had!


#### Schema Change Example

Our data v2 has the following changes
* A new int attribute **area**
* Demote the **id** attribute from **long** to **int**
* Remove the mistaken country code "XYZ"

Can our old application still process data written with the new schema?

In [None]:
### First let's write some v2 data

new_city_data = [
 {'id': 9001, 'name': 'Portland', 'area': 145, 'country_code': 'USA', 
  'neighborhoods': ['Portland Heights'], 'mayor': {'name': 'Ted Wheeler'}},   
  {'id': 9002, 'name': 'Vancouver', 'country_code': 'CAN', 'area': 44,
   'neighborhoods': [], 'mayor': {'name': 'Kennedy Stuart', 'year_elected': 2018}}
]

schema = avro.schema.Parse(open("cityv2.avsc", "rb").read())

with DataFileWriter(open("cities_v2.avro", "wb"), DatumWriter(), schema) as writer:
    for record in new_city_data:
        writer.append(record)

In [None]:
## This is the case of an v1 application 
## -- that knows only about the v1 schema --
## processing data written with the v2 schema

reader_schema = avro.schema.Parse(open("cityv1.avsc", "rb").read()) 
writer_schema = avro.schema.Parse(open("cityv2.avsc", "rb").read())

with DataFileReader(open("cities_v2.avro", "rb"), DatumReader(writer_schema, reader_schema)) as reader:
    for city in reader:
        print(city['name'] + ", " +  city['country_code'] + ", " + city['mayor']['name'] + ", " + str(len(city['neighborhoods'])))

In [None]:
## Try a third schema where the mandatory attribute id is removed 
v3_cities = [
 {'name': 'Bend', 'area': 145, 'country_code': 'USA', 
  'neighborhoods': [], 'mayor': {'name': 'Sally Russell'}}
]

schemav3 = avro.schema.Parse(open("cityv3.avsc", "rb").read())

with DataFileWriter(open("cities_v3.avro", "wb"), DatumWriter(), schemav3) as writer:
    for record in v3_cities:
        writer.append(record)

In [None]:
reader_schema = avro.schema.Parse(open("cityv1.avsc", "rb").read()) 
writer_schema = avro.schema.Parse(open("cityv3.avsc", "rb").read())

with DataFileReader(open("cities_v3.avro", "rb"), DatumReader(writer_schema, reader_schema)) as reader:
    for city in reader:
        print(city['name'] + ", " +  city['country_code'] + ", " + city['mayor']['name'] + ", " + str(len(city['neighborhoods'])))