In [2]:
import numpy as np 

# Big data processing on a single machine with Python!

<div>
    <img src="portada.jpg" alt="many workers handling big data on a single computer">
</div>

In [4]:
import pandas  as pd
DATA_DIR="C:/_DATA/experimentation/"
checkins_df = pd.read_csv( DATA_DIR + 'Gowalla_totalCheckins.txt', delimiter='\t', header=None )


In [5]:
checkins_df.columns = ["user_id", "checkin_ts", "latitude", "longitud", "location_id"]
checkins_df1 = checkins_df[ checkins_df.location_id % 1000 == 1] 

## A (big data?) problem:

We are given data from a social networking site:

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>user_id</th>
      <th>checkin_ts</th>
      <th>latitude</th>
      <th>longitud</th>
      <th>location_id</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>68389</th>
      <td>241</td>
      <td>2010-10-14T02:09:59Z</td>
      <td>47.611588</td>
      <td>-122.200418</td>
      <td>2158038</td>
    </tr>
    <tr>
      <th>73200</th>
      <td>247</td>
      <td>2010-09-29T18:34:05Z</td>
      <td>49.246997</td>
      <td>-123.002488</td>
      <td>21782</td>
    </tr>
    <tr>
      <th>13788</th>
      <td>71</td>
      <td>2010-03-20T20:20:47Z</td>
      <td>39.293855</td>
      <td>-94.720260</td>
      <td>547748</td>
    </tr>
    <tr>
      <th>63127</th>
      <td>222</td>
      <td>2009-12-17T18:33:13Z</td>
      <td>42.074845</td>
      <td>-87.740850</td>
      <td>152179</td>
    </tr>
    <tr>
      <th>73875</th>
      <td>247</td>
      <td>2010-07-08T23:59:37Z</td>
      <td>49.277759</td>
      <td>-122.858863</td>
      <td>13217</td>
    </tr>
  </tbody>
</table>

In [54]:
# print( df.iloc[ list(np.random.choice( 100000, 5 )), :].to_html()) 

Each row corresponds to a check-in event: user checks-in at a location at a given time.


The input dataset consists of 6.4 MM records 

### Problem: detect stalker-stalkee pairs


If $E$ and $R$ are two users, we define the **stalking measure** between $E$ and $R$
as the number of distinct locations $L$ such that $E$ visited $L$ and $R$ _also_ visited $L$ at a _later_ time.

We want to find the top 5 pairs with the highest stalking measure.

### Solution: with Pandas!

In [15]:
checkins_by_loc = (checkins_df1[['user_id', 'checkin_ts', 'location_id']] 
                      .set_index('location_id') ) 

In [16]:
chin_pairs = checkins_by_loc.join( checkins_by_loc, lsuffix='_ee', rsuffix='_er' ) 

In [17]:
pairs_filtered = (chin_pairs[(chin_pairs.checkin_ts_ee < chin_pairs.checkin_ts_er) &                                                 
                             (chin_pairs.user_id_ee != chin_pairs.user_id_er )]
                      .rename( columns= {"user_id_er" : "stalker",
                                         "user_id_ee" : "stalkee" }) 
                      .reset_index()
                      [["stalkee", "stalker", "location_id"]] )

In [18]:
final_result = ( pairs_filtered.drop_duplicates()                 
                     .groupby(["stalkee", "stalker"])
                     .agg( {"location_id" : "count"})
                     .rename( columns = { "location_id" : "location_count" } )                 
                     .sort_values('location_count', ascending=False) )

 * <span style="color:red"> MEMORY ERROR!</span>

## Why the out-of-memory error? 


* If there are $n_l$ check-ins at location $l$, then `chin_pairs` contains $n_l^2$ records for this location. 

   * So, a location with $n_l = 1000$ checkins generates $1,000,000$ records.

In total, we  would get $\sum_l$ $n_l^2 $ records in total.

In [8]:
num_chins_by_loc = ( checkins_df.groupby( "location_id" )
                                .agg( {"location_id" : "count"} )
                                .rename( columns = {"location_id" : "n_l"} ) ) 

num_chins_by_loc.head(9).transpose()

location_id,8904,8932,8936,8938,8947,8954,8956,8957,8958
n_l,12,16,12,130,570,18,61,36,42


In [22]:
num_chin_pairs = int( (num_chins_by_loc ** 2).sum() )
num_chin_pairs

561828204

## Second attempt 
    
*  Sort the data by location (in memory)

*  For each $L$ location produce all tuples $(E,R,L)$ such that $E$ visited $L$ a $R$ visited $L$ at a later time. Store this in disk

*  Sort the resulting files so that we have all tuples with the same $(E,R)$ appear in the same file. Delete previous set of files and store newly sorted files in disc. 

*  For each $(E,R)$ combination count the number of distinct $L$s to compute the stalking measure. 

The procedure above looks a lot like **map-reduce**, but on a single computer...

 * Took 3 hours to code in Python
 * and **5 hours** to run to completion 
 

  * Is it possible to process _big data_ on a laptop?
  

  * Is it convenient to process _big data_ on a laptop?
    

  *  **Big data** (one definition):  
    * Data is big whenever doesn't fit in RAM of a _single_ computer.
 

  * Big data computation is _not necessarily_ distributed
  

## The concept of out-of-core computation

When your data doesn't fit in RAM but it does fit on your disk:
  
  * Load one chunk of input data, do something with it, write intermediate result to disk (if necessary)
  
  
  * Drop input data chunk (from memory)
  
  
  * Load the next chunk of data, do something with it, write intermediate result to disk (if necessary)
  
  
  * Rinse, repeat...  
  
  

## But wait... 

A disgression into _access_ latencies

| System Event |	Actual Latency	|
|--------------|-------------------:|
| One CPU cycle |	0.4 ns	| 
| Level 1 cache access |	0.9 ns	|
| Level 2 cache access |	2.8 ns	|
| Level 3 cache access | 28 ns | 
| Main memory access (DDR DIMM) | ~100 ns |
| SSD I/O | 50–150 μs | 
| Magnetic disk I/O | 1–10 ms | 
| Internet call: San Francisco to New York City | 65 ms |
| Internet call: San Francisco to Hong Kong | 141 ms |

** * Latency**: time you have to wait for the _first byte_ to arrive 
 

## But wait... 

Scaling to human scale:  1 CPU cycle $\to$ 1 s ($\approx 1$ heartbeat) 

| System Event                |	Actual Latency	| Scaled Latency | Human event |
|-----------------------------|-----------------:|----------------|-----------|
| One CPU cycle |	0.4 ns	        | 1 s | 1 heartbeat | 
| Level 1 cache access |	0.9 ns	| 2 s | remembering something |
| Level 2 cache access |	2.8 ns	| 7 s | checking notebook | 
| Level 3 cache access | 28 ns      | 1 min | looking at a book  |
| Main memory access (DDR DIMM) | ~100 ns | 4 min | calling a friend  |
| SSD I/O | 50–150 μs                                   | 1.5–4 days | getting a reply by letter | 
| Rotational disk I/O | 1–10 ms                         | 1–9 months | making a baby |
| Internet call: San Francisco to New York City | 65 ms | 5 years | finishing an undergrad |
| Internet call: San Francisco to Hong Kong | 141 ms    | 11 years | doing undergrad + PhD | 
 
 
** * Latency**: time you have to wait for the _first byte_ to arrive

## But wait...

How about _throughput_?

| Reading from source: |  Time taken to read 1000 MB  |
|----------------------| -----------------------------|
| Main memory DDR3 or DDR4 <br /> (L1/2 caches are faster) |  40 ms |
| Solid state drive (SSD - SATA)  |  750 ms |
| Magnetic hard drive (HDD)   | 10 s | 


### Conclusion: 


Not all discs were created equal!
  
  * Reading big amounts of data from a magnetic HDD is about 200x slower than from memory.


  * Doing the same from SDD is only about 20x as slow... could be acceptable!

## Second problem with out of core computation...

* A simple data analyst should **not** have to worry about splitting data in chunks, storing intermediate results in disc, releasing memory, reloading ...



* This is **error prone**,  **tedious** and **distracts** us from the main goal, to design a nice data-analysis algorithm 



* What we want is a software framework that does this for us and abstracts all the complexity!


* Something like **Turicreate**

## Turicreate to the rescue!

### What is Turicreate?

   * Formerly graphlab-create and closed-source.

   
   * Acquired by Apple Inc. and open-sourced   
   
   
   * Available at: https://github.com/apple/turicreate
   
   
   * Facilities for manipulating both structured and unstructured data 
   
   
   * Develop ML / DL models
 

### Why do we care?

   * Based on **SFrames** library, very much like Pandas DataFrames but *out-of-core*!
   
   
   * Very similar API $\to$ easy translation!

In [None]:
checkins = checkins_sf[["user_id", "location_id", "checkin_ts"]]

In [None]:
chin_ps = ( checkins.join( checkins, on = 'location_id' )
                    .rename( {'checkin_ts' : 'checkin_ts_ee',
                              'checkin_ts.1' : 'checkin_ts_er',
                              'user_id' : 'stalkee' ,
                              'user_id.1' : 'stalker' }) )

In [None]:
pairs_filtered = chin_ps[ (chin_ps['checkin_ts_ee'] < chin_ps['checkin_ts_er']) &
                          (chin_ps['stalkee'] != chin_ps['stalker']) ]

In [11]:
final_result = ( pairs_filtered[[ 'stalkee', 'stalker', 'location_id']]
                     .unique()
                     .groupby( ['stalkee','stalker'] ,
                               {"location_count" : agg.COUNT })
                     .topk( 'location_count', k=5, reverse=False )
                     .materialize() )

### It works!

        Inferred types from first 100 line(s) of file as
        column_type_hints=[int,str,float,float,int]
        ------------------------------------------------------
        Read 870755 lines. Lines per second: 520097
        Finished parsing file /home/ubuntu/turicreate_experiment/Gowalla_totalCheckins.txt
        Parsing completed. Parsed 6442892 lines in 5.43026 secs.

        +---------+-----------+----------------+
        | stalkee |  stalker  | location_count |
        +---------+-----------+----------------+
        |   1251  |   106819  |      388       |
        |  10410  |   10393   |      365       |
        |  40090  |   132961  |      361       |
        |   1404  |    1080   |      330       |
        |  18446  |   106815  |      326       |
        +---------+-----------+----------------+
        [5 rows x 3 columns]
        
* Took ~1700 secs on an AWS Ubuntu machine with **4 GB** of RAM  and **16GB SSD.**

## Other alternatives (in the Python ecosystem)


### **Ray**: https://github.com/ray-project/ray

<div>
    <img src="ray.png" />
 </div>

  
  * "Ray is a flexible, high-performance distributed execution framework."
  
  * `pip install ray` but doesn't work for Windows :( 
  


### **Dask:** https://dask.pydata.org/en/latest/

<br />
<div>
    <img src="dask.png" />
 </div>
 
* "Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love"
  * Runs on Windows!
  * Built on top of the standard PyData stack: `numpy` / `pandas` / `scikit-learn`
  * still not mature
    
  

## Summary


* It is possible an even convenient to process (modereately) Big Data on a laptop!

* When handling big data access speeds and throughput measurements matter!

* Not all disks are created equal!

* There are nice tools por out-of-core Big Data computation in the Python ecosystem!

## References 

Code show is to be found here: https://github.com/YuxiGlobal/data-analytics/tree/master/out_of_core_experiments

https://software.intel.com/en-us/articles/memory-performance-in-a-nutshell
    
http://www.prowesscorp.com/computer-latency-at-a-human-scale/
    
https://medium.com/@mateini_12893/python-for-big-data-computation-on-a-single-computer-c232046df3c3
    
https://www.analyticsvidhya.com/blog/2018/03/pandas-on-ray-python-library-make-processing-faster/


        

### Ganadores de premios

1. @Aveldarrama
2. Ricardo Ruíz Cortés
3. Yovany Alvarez Correa 
4. Luis Eduardo Lopez
5. Cristian Orozco
