
**Questions:**
- Q1. What is the trend in crime over the past years?
- Q2. Which categories of crimes are the most common?
- Q3. In whick boroughs is a particular category of crime most prevalent?

**Cleaning Data:**

- Filtering the data
- Treat with missing values and anomalous data

Transforming Data:

- Extracting fields
- Computing metrics

Imperative programming way: using loops - process each record one by one

Functional programming way: apply the same function to each record - allows you  to process data in parallel

## Functional Programming

- filter: filter records with conditions
- map: transform each record to another record
- reduce: combined records in a specified way


```python

import csv
from StringIO import StringIO
from collections import namedtuple

path = "file///Users...csv"
data = sc.textFile(path)

data.take(10)

header = data.first()
print(header)

dataWoHeader = data.filter(lambda x: x <> header)
dataWoHeader.first()

dataWoHeader.map(lambda x: x.split(",")).take(10)

fields = header.replace(" ", "_").replace("/", "_").split(",")
print(fields)

Crime = namedtuple('Crime', fields, verbose=True)

def parse(row):
    reader = csv.reader(StringIO(row))
    row = reader.next()
    return Crime(*row)   
    
crimes = dataWoHeader.map(parse)
crimes.first()
crimes.first().Offense

crimes.map(lambda x:x.Offence).countByValue()
crimes.map(lambda x:x.Occurence_Yeaar).countByValue()
crimesFiltered = crimes.filter(lambda x: not (x.Offense == 'NA' or x.Occurrence_Year == ''))
                       .filter(lambda x: int(x.Occurrence_Year) >= 2006)
                    
crimesFiltered.map(lambda x:x.Occurence_Yeaar).countByValue()    

def extractCoords(location):
    location_lat = float(location[1:location.index(",")])
    location_lon = float(location[location.index(",")+1:-1])
    return (location_lat, location_lon)
    
crimesFiltered.map(lambda x: extractCoords(x.Location_1)).reduce(lambda:x,y:(min(x[0],y[0]), min(x[1], y[1])))    


crimesFiltered.filter(lambda x:x.Offense=='BURGLARY').map(lambda x:x.Occurence_Yeaar).countByValue()    

import gmplot
gmap = gmplot.GoogleMapPlotter(37.428, -122.145, 16).from_geocode("New York City")

b_lats = crimesFiltered.map(lambda x: extractCoords(x.Location_1))[0].collect()
b_lons = crimesFiltered.map(lambda x: extractCoords(x.Location_1))[1].collect()

gmap.scatter(b_lats, b_lons, '#DE1515', size=40, marker=False)
gmap.draw("mymap.html")

```



- collect()
- take()
- first()

- countByValue()
- join()

Two types of RDD:
- Base RDD
- Pair RDD - each record is a tuple  (word, count)
    - Summarize by keys:
        - reduceByKey(): sum, max, min (mean will not work)
            - like reduce, is a function takes 2 arguments
            - only combines values with the same key (reduce applys to the complete records)
            - is a transformation (reduce is an action)
        - combineByKey(): mean (see below)
    - Merge by keys:
        - join
        - leftOuterJoin
        - rightOuterJoin

## Summarizing Data Along Dimensions

```python

from datetime import datetime
def parseTraffic(row):
    DATE_FMT = "%m/%d/%Y %H:%M"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT)
    row[1] = int(row[1])
    return (row[0], row[1])
    
def parseGames(row):
    DATE_FMT = "%m/%d/%Y"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT).date()
    return (row[0], row[4])    
    
trafficParsed = traffic.map(parseTraffic)    
gamesParsed = games.map(parseHames)

dailyTrend = trafficParsed.map(lambda x: (x[0].date(), x[1])).reduceByKey(lambda x, y: x + y)
dailyTrend.sortBy(lambda x: -x[-1]).take(10)  #sort in descending order

## Merging Pair RDDs

dailyTrendCombined = dailyTrend.leftOuterJoin(gamesParsed)
dailyTrendCombined.take(10)

def checkGameDay(row):
    if row[1][1] == None:
        return (row[0], row[1][1], "Regular Day", row[1][0])
    else:
        return (row[0], row[1][1], "Game Day", row[1][0])

dailyTrendbyGames = dailyTrendCombined.map(checkGameDay)
dailyTrendbyGames.take(10)

dailyTrendbyGames.sortBy(lambda x:-x[3]).take(10)

# average traffic on game day vs non game day
dailyTrendbyGames.map(lambda x: (x[2], x[3])).combineByKey(lambda value: (value,1), \
                                      lambda acc, value:(acc[0]+value, acc[1] + 1), \
                                      lambda acc1, acc2:(acc1[0]+acc2[0], acc1[1]+acc2[1])) \
                                      .mapValues(lambda x:x[0] / x[1]).collect()
```

## [Marvel Social Universe](http:/bioinfo.uib.es/~joemiro/marvel.html)

**Network:**
- vertex: the characters
- edges: relationship between the characters

**Similar networks:**
- webpages
- members of a social network
- Articles/Documents/Text

**Questions:**
- Q1. Find the most influential charcacters
- Q2. Build a co-occurence network from the given data
- Q3. Find the most important cliques

```python
booksPath = "file:///User.../Books.txt"
charactersPath = "file:///User.../Characters.txt"
edgesPath = "file:///Users.../Edges.txt"

books = sc.textFile(booksPath)            # contains vertex name and book name
characters = sc.textFile(charactersPath)  # contains vertex name and character name
edges = sc.textFile(edgesPath)            # contains complete list of vertices and a list of edge contains character and list of books

# filter out vertics 
def edgeFilter(row):
    if '*' in row or '"' in row:
        return False
    else:
        return True

edgesFiltered = edges.filter(edgeFilter)

characterBookMap = edgesFiltered.map(lambda x: x.split()).map(lambda x: (x[0], x[1:]))

def charParse(row):
    row = row.split(":")
    return (row[0][7:], row[1].strip())
    
characterLookup = characters.map(charParse).collectAsMap()    # collectAsMap() return data as a dictionary

characterStrength = characterBookMap.mapValues(lambda x:len(x)).map(lambda x: (characterLoopup[x[0]], x[1])) \
                                    .reduceByKey(lambda x,y: x + y) \
                                    .sortBy(lambda x: -x[1])
characterStrength.take[10]
```

### Co-occurence Network

Two entities occur together in some way - product recommendation

Step 1:  CharacterBookMap (character, list of books)  --> bookCharacterMap (book, list of characters)
Step 2:  bookCharacterMap (book, list of characters)  --> pairs of characters (have duplicates)
Step 3:  pairs of characters  -->  (pair of character, #count)

```python
bookCharacterMap = characterBookMap.flatMapValue(lambda x: x).map(lambda x: (x[1], x[0])) \
                                   .reduceByKey(lambda x,y: x + ",' +y") \
                                   .mapValues(lambda x: x.split(","))
import itertools
cooccurenceMap = bookCharacterMap.flatMap(lambda x: list(itertools.combinations(x[1], 2)))
cooccurenceStrength = cooccurenceMap.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)

cooccurenceStrength.take(10)

sortedCooccurence = coocurenceEdges.sortBy(lambda x:-x[2]).map(lambda x: (characterLookup[x[0]], characterLookup[x[1]],x[2]))
sortedCooccurence.filter(lambda x:'SPIDER-MAN/PETER PARKER' in x).take(10)

sortedCooccurence.map(lambda x:x[2]).states()


import networkx as nx
G=nx.Graph()
edges = sortedCooccurence.map(lambda x: (x[0],x[1],{'weight':1000/x[2]})).take(50)

G.add_edges_from(edges)

import matplotlib.pyplot as plt
nx.draw_networkx(G, pos=nx.spring_layout(G))
plt.show()
```