# Game Events Pipeline and Analysis
### Project 3: Kumar, Jot, Patrick
### MIDS W205: Summer 2021

This project includes the creation of a pipeline for game data and an analysis of this data. The data is sourced from a Flask application that is also part of this project.

# Pipeline Setup

The pipeline utilizes Kafka -> Spark -> HDFS from docker containers. Run the following commands from the terminal for setup.

### Run the docker-compose.yml file

`docker-compose up -d`

### Create Kafka Topic

```docker-compose exec kafka \
  kafka-topics \
    --create \
    --topic events \
    --partitions 1 \
    --replication-factor 1 \
    --if-not-exists \
    --zookeeper zookeeper:32181```

### Spin up the Flask Application

```docker-compose exec mids \
  env FLASK_APP=/w205/project-3-kumar-narayanan/game_api.py \
  flask run --host 0.0.0.0```

### Generate Data using Apache Bench from a Custom Bash Script

`bash generate_data`

## Spark for Transformations from Kafka

Run the commands below in Jupyter Notebook, or execute `spark_commands.py` using:

`docker-compose exec spark spark-submit /w205/project-3-kumar-narayanan/spark_commands.py`

being sure to alter the path to the file as necessary.

### Read in the Events from Kafka

In [1]:
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf

In [2]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe","events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [3]:
raw_events.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [4]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 75 73 65 7...|events|        0|     0|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0|     1|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0|     2|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0|     3|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0|     4|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0|     5|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0|     6|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0|     7|2021-07-31 02:45:...|            0|
|null|[7B 22 75 73 65 7...|events|        0

### Load as String and Filter on Events
Here we load in the data to be human readable (string format) and filter only for events in the Kafka topic that are "buy" events. Also, we generate a timestamp.

In [5]:
@udf('boolean')
def is_valid_event(event_as_json):
    """Returns true if the event type is a buy, sell, delete, or play_game, otherwise returns False."""
    event = json.loads(event_as_json)
    if (event['event_type'] == 'buy' or event['event_type'] == 'sell' or 
       event['event_type'] == 'play_game' or  event['event_type'] == 'delete'):
        return True
    return False

In [6]:
@udf('boolean')
def is_purchase(event_as_json):
    """Returns true if the event type is a buy, otherwise returns False."""
    event = json.loads(event_as_json)
    if event['event_type'] == 'buy':
        return True
    return False

In [7]:
@udf('boolean')
def is_sell(event_as_json):
    """Returns true if the event type is a sell, otherwise returns False."""
    event = json.loads(event_as_json)
    if event['event_type'] == 'sell':
        return True
    return False

In [8]:
@udf('boolean')
def is_delete(event_as_json):
    """Returns true if the event type is a delete, otherwise returns False."""
    event = json.loads(event_as_json)
    if event['event_type'] == 'delete':
        return True
    return False

In [9]:
@udf('boolean')
def is_play(event_as_json):
    """Returns true if the event type is a play_game, otherwise returns False."""
    event = json.loads(event_as_json)
    if event['event_type'] == 'play_game':
        return True
    return False

In [10]:
all_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),raw_events.timestamp.cast('string')) \
    .filter(is_valid_event('raw'))
    
extracted_all_events = all_events.rdd.map(lambda x: Row(timestamp=x.timestamp, **json.loads(x.raw))).toDF()

In [11]:
purchase_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),raw_events.timestamp.cast('string')) \
    .filter(is_purchase('raw'))
    
extracted_events = purchase_events.rdd.map(lambda x: Row(timestamp=x.timestamp, **json.loads(x.raw))).toDF()

In [12]:
sell_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),raw_events.timestamp.cast('string')) \
    .filter(is_sell('raw'))
    
extracted_sell = sell_events.rdd.map(lambda x: Row(timestamp=x.timestamp, **json.loads(x.raw))).toDF()

In [13]:
delete_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),raw_events.timestamp.cast('string')) \
    .filter(is_delete('raw'))
    
extracted_delete = delete_events.rdd.map(lambda x: Row(timestamp=x.timestamp, **json.loads(x.raw))).toDF()

In [14]:
play_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),raw_events.timestamp.cast('string')) \
    .filter(is_play('raw'))
    
extracted_play = play_events.rdd.map(lambda x: Row(timestamp=x.timestamp, **json.loads(x.raw))).toDF()

### Print Schema and Show Data

In [15]:
extracted_all_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)



In [16]:
extracted_all_events.show(5)

+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
|Accept|                Host|     User-Agent|desc|event_type| item|           timestamp|        user_id|
+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
only showing top 5 rows



In [17]:
extracted_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)



In [18]:
extracted_events.show(5)

+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
|Accept|                Host|     User-Agent|desc|event_type| item|           timestamp|        user_id|
+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
|   */*|agitated_darwin.v...|ApacheBench/2.3|None|       buy|knife|2021-07-31 02:45:...|agitated_darwin|
+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
only showing top 5 rows



In [19]:
extracted_sell.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)



In [20]:
extracted_sell.show(5)

+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
|Accept|                Host|     User-Agent|desc|event_type| item|           timestamp|        user_id|
+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
|   */*|agitated_darwin.c...|ApacheBench/2.3|None|      sell|sword|2021-07-31 02:47:...|agitated_darwin|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None|      sell|sword|2021-07-31 02:47:...|agitated_darwin|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None|      sell|sword|2021-07-31 02:47:...|agitated_darwin|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None|      sell|sword|2021-07-31 02:47:...|agitated_darwin|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None|      sell|sword|2021-07-31 02:47:...|agitated_darwin|
+------+--------------------+---------------+----+----------+-----+--------------------+---------------+
only showing top 5 rows



In [21]:
extracted_delete.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)



In [22]:
extracted_delete.show(5)

+------+--------------------+---------------+----+----------+--------+--------------------+---------------+
|Accept|                Host|     User-Agent|desc|event_type|    item|           timestamp|        user_id|
+------+--------------------+---------------+----+----------+--------+--------------------+---------------+
|   */*|agitated_darwin.c...|ApacheBench/2.3|  NA|    delete|crossbow|2021-07-31 02:47:...|agitated_darwin|
|   */*|agitated_darwin.c...|ApacheBench/2.3|  NA|    delete|crossbow|2021-07-31 02:47:...|agitated_darwin|
|   */*|agitated_darwin.c...|ApacheBench/2.3|  NA|    delete|crossbow|2021-07-31 02:47:...|agitated_darwin|
+------+--------------------+---------------+----+----------+--------+--------------------+---------------+



In [23]:
extracted_play.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)



In [24]:
extracted_play.show(5)

+------+--------------------+---------------+----+----------+-----+--------------------+-------+
|Accept|                Host|     User-Agent|desc|event_type| item|           timestamp|user_id|
+------+--------------------+---------------+----+----------+-----+--------------------+-------+
|   */*|agitated_darwin.c...|ApacheBench/2.3|None| play_game|sword|2021-07-31 02:47:...|  user9|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None| play_game|sword|2021-07-31 02:47:...|  user9|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None| play_game|sword|2021-07-31 02:47:...|  user9|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None| play_game|sword|2021-07-31 02:47:...|  user9|
|   */*|agitated_darwin.c...|ApacheBench/2.3|None| play_game|sword|2021-07-31 02:47:...|  user9|
+------+--------------------+---------------+----+----------+-----+--------------------+-------+
only showing top 5 rows



## Output to HDFS

In [25]:
extracted_all_events.write.mode("overwrite").parquet("tmp/all_events")
extracted_events.write.mode("overwrite").parquet("tmp/purchases")
extracted_sell.write.mode("overwrite").parquet("tmp/sell")
extracted_delete.write.mode("overwrite").parquet("tmp/delete")
extracted_play.write.mode("overwrite").parquet("tmp/play")

## Conduct Analysis in Presto

In order to run commands in Presto, execute the following script:

  **docker-compose exec spark spark-submit /w205/project-3-kumar-narayanan/spark_commands.py**

followed by
  **docker-compose exec presto presto --server presto:8080 --catalog hive --schema default**


### Query 1 

##### How many players are there? 

Command: 
```
select count(distinct(user_id)) as distinct_users from all_events;
```

```
 presto:default> select count(distinct(user_id)) as distinct_users from all_events;
 distinct_users 
----------------
              5 
(1 row)

Query 20210802_040930_00006_rk9vj, FINISHED, 1 node
Splits: 3 total, 1 done (33.33%)
0:01 [2.44K rows, 14.1KB] [3.52K rows/s, 20.4KB/s]
```

### Query 2

##### How many weapons are there? How many of each were sold? 

Command: 
```
select count(*) as count, item from all_events group by item order by count(*) desc;
```

```
 presto:default> select count(*) as count, item from all_events group by item order by count(*) desc;
 count |   item   
-------+----------
  1220 | sword    
   707 | crossbow 
   512 | knife    
(3 rows)

Query 20210802_040807_00005_rk9vj, FINISHED, 1 node
Splits: 3 total, 0 done (0.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
```

### Query 3

##### How many times did each user participate in each event? 

Command: 
```
select user_id, event_type, count(*) as count from all_events group by user_id, event_type order by count(*) desc;
```

```
presto:default> select user_id, event_type, count(*) as count from all_events group by user_id, event_type order by count(*) desc;
     user_id     | event_type | count 
-----------------+------------+-------
 agitated_darwin | buy        |   900 
 user9           | play_game  |   384 
 user7           | buy        |   384 
 user4           | buy        |   256 
 user5           | buy        |   256 
 agitated_darwin | sell       |   128 
 agitated_darwin | play_game  |   128 
 agitated_darwin | delete     |     3 
(8 rows)

Query 20210802_040614_00004_rk9vj, FINISHED, 1 node
Splits: 3 total, 1 done (33.33%)
0:00 [2.44K rows, 14.1KB] [5.22K rows/s, 30.3KB/s]
```

### Query 4

##### How many (and what kind) of weapons do players own?

Command: 
```
select user_id, item as weapon, count(*) as count from all_events group by user_id, item order by count(*) DESC;
```

```
presto:default> select user_id, item as weapon, count(*) as count from all_events group by user_id, item order by count(*) DESC;
     user_id     |  weapon  | count 
-----------------+----------+-------
 agitated_darwin | sword    |   452 
 agitated_darwin | crossbow |   451 
 user7           | sword    |   384 
 user9           | sword    |   384 
 agitated_darwin | knife    |   256 
 user4           | knife    |   256 
 user5           | crossbow |   256 
(7 rows)

Query 20210802_040511_00003_rk9vj, FINISHED, 1 node
Splits: 3 total, 2 done (66.67%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]
```


### Query 5

##### How many games were played with each kind of weapon?

Command: 
```
select user_id, count(*) as count, item as weapon from all_events where event_type = 'play_game' group by user_id, item;
```

```
presto:default> select user_id, count(*) as count, item as weapon from all_events where event_type = 'play_game' group by user_id, item;
     user_id     | count |  weapon  
-----------------+-------+----------
 user9           |   384 | sword    
 agitated_darwin |   128 | crossbow 
(2 rows)

Query 20210802_040328_00002_rk9vj, FINISHED, 1 node
Splits: 3 total, 1 done (33.33%)
0:06 [2.44K rows, 14.1KB] [433 rows/s, 2.51KB/s]
```

### Spark streaming reads data

- To start Spark Streaming Read run 

  **docker-compose exec spark spark-submit /w205/project-3-kumar-narayanan/filter_events_stream.py**

- While the program is waiting feed data:

  ** docker-compose exec mids \
     ab -n 50 -H "Host: agitated_darwin.comcast.net" http://localhost:5000/delete_item?item=crossbow **
     

  ** docker-compose exec mids \
     ab -n 50 -H "Host: user8.comcast.net" http://localhost:5000/sell_item?uid=user8%22item=knife **


- The output of this data feed is as below:

```
+---------+---------+------+----+----------+----+----------+----+-------+
|raw_event|timestamp|Accept|Host|User-Agent|desc|event_type|item|user_id|
+---------+---------+------+----+----------+----+----------+----+-------+
+---------+---------+------+----+----------+----+----------+----+-------+

21/07/30 23:51:48 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:51:47.491Z",
  "numInputRows" : 49,
  "inputRowsPerSecond" : 53.55191256830601,
  "processedRowsPerSecond" : 79.9347471451876,
  "durationMs" : {
    "addBatch" : 503,
    "getBatch" : 15,
    "getOffset" : 1,
    "queryPlanning" : 27,
    "triggerExecution" : 613,
    "walCommit" : 33
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3237
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "numInputRows" : 49,
    "inputRowsPerSecond" : 53.55191256830601,
    "processedRowsPerSecond" : 79.9347471451876
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}
21/07/30 23:51:48 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:51:48.167Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 2,
    "triggerExecution" : 2
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}
21/07/30 23:51:58 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:51:58.170Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 1,
    "triggerExecution" : 1
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}
21/07/30 23:52:08 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:52:08.173Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 1,
    "triggerExecution" : 1
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}
21/07/30 23:52:18 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:52:18.179Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3286
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}

+---------+---------+------+----+----------+----+----------+----+-------+
|raw_event|timestamp|Accept|Host|User-Agent|desc|event_type|item|user_id|
+---------+---------+------+----+----------+----+----------+----+-------+
+---------+---------+------+----+----------+----+----------+----+-------+

21/07/30 23:53:11 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:53:10.667Z",
  "numInputRows" : 48,
  "inputRowsPerSecond" : 67.13286713286713,
  "processedRowsPerSecond" : 117.64705882352942,
  "durationMs" : {
    "addBatch" : 293,
    "getBatch" : 22,
    "getOffset" : 0,
    "queryPlanning" : 37,
    "triggerExecution" : 408,
    "walCommit" : 44
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3288
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3336
      }
    },
    "numInputRows" : 48,
    "inputRowsPerSecond" : 67.13286713286713,
    "processedRowsPerSecond" : 117.64705882352942
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}
21/07/30 23:53:11 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:53:11.111Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 1,
    "triggerExecution" : 1
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3336
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3336
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}
21/07/30 23:53:21 INFO StreamExecution: Streaming query made progress: {
  "id" : "77b9977e-f196-42bc-9475-f84b0eddfe65",
  "runId" : "b060215b-6201-4b48-9e64-730c075a833d",
  "name" : null,
  "timestamp" : "2021-07-30T23:53:21.116Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[events]]",
    "startOffset" : {
      "events" : {
        "0" : 3336
      }
    },
    "endOffset" : {
      "events" : {
        "0" : 3336
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ba51f99"
  }
}

```

### State tracking - files grow as more events come in

- To write data in streaming mode run
  
  **docker-compose exec spark spark-submit /w205/project-3-kumar-narayanan/write_events_stream.py**

- With the program waiting feed data as below

  while true; do
    docker-compose exec mids \
      ab -n 10 -H "Host: agitateddarwin.comcast.com" \
        http://localhost:5000/play_game?weapon=crossbow

  sleep 5

    docker-compose exec mids \
      ab -n 10 -H "Host: user8.comcast.net" \
        http://localhost:5000/sell_item?uid=user8%22item=knife

  sleep 5
  
  done


- The output below captures the growing files as more data is fed.

```
(base) jupyter@w205-01:~/w205/project-3-kumar-jot-patrick$ docker-compose exec cloudera hadoop fs -ls /tmp/all_events
Found 4 items
drwxr-xr-x   - root supergroup          0 2021-07-30 23:46 /tmp/all_events/_spark_metadata
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-441e730c-9e79-4dc3-ab58-187487200b63-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-48ab1ff9-7b73-4c0c-848b-4618e5f912b9-c000.snappy.parquet
-rw-r--r--   1 root supergroup        919 2021-07-30 23:45 /tmp/all_events/part-00000-782a6ce0-5dfd-48b4-8920-de191c629d94-c000.snappy.parquet
(base) jupyter@w205-01:~/w205/project-3-kumar-jot-patrick$ docker-compose exec cloudera hadoop fs -ls /tmp/all_events
Found 5 items
drwxr-xr-x   - root supergroup          0 2021-07-30 23:46 /tmp/all_events/_spark_metadata
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-441e730c-9e79-4dc3-ab58-187487200b63-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-48ab1ff9-7b73-4c0c-848b-4618e5f912b9-c000.snappy.parquet
-rw-r--r--   1 root supergroup        919 2021-07-30 23:45 /tmp/all_events/part-00000-782a6ce0-5dfd-48b4-8920-de191c629d94-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3379 2021-07-30 23:46 /tmp/all_events/part-00000-c974a689-d787-4602-8a78-9425556f1fbe-c000.snappy.parquet
(base) jupyter@w205-01:~/w205/project-3-kumar-jot-patrick$ docker-compose exec cloudera hadoop fs -ls /tmp/all_events
Found 6 items
drwxr-xr-x   - root supergroup          0 2021-07-30 23:46 /tmp/all_events/_spark_metadata
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-441e730c-9e79-4dc3-ab58-187487200b63-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-48ab1ff9-7b73-4c0c-848b-4618e5f912b9-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3378 2021-07-30 23:46 /tmp/all_events/part-00000-7452b8f0-85fb-42a0-a5fe-5371b5b09514-c000.snappy.parquet
-rw-r--r--   1 root supergroup        919 2021-07-30 23:45 /tmp/all_events/part-00000-782a6ce0-5dfd-48b4-8920-de191c629d94-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3379 2021-07-30 23:46 /tmp/all_events/part-00000-c974a689-d787-4602-8a78-9425556f1fbe-c000.snappy.parquet
(base) jupyter@w205-01:~/w205/project-3-kumar-jot-patrick$ docker-compose exec cloudera hadoop fs -ls /tmp/all_events
Found 7 items
drwxr-xr-x   - root supergroup          0 2021-07-30 23:46 /tmp/all_events/_spark_metadata
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-441e730c-9e79-4dc3-ab58-187487200b63-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-48ab1ff9-7b73-4c0c-848b-4618e5f912b9-c000.snappy.parquet
-rw-r--r--   1 root supergroup        919 2021-07-30 23:46 /tmp/all_events/part-00000-49d98d0b-eff6-4ca6-a744-62f63e5fdff7-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3378 2021-07-30 23:46 /tmp/all_events/part-00000-7452b8f0-85fb-42a0-a5fe-5371b5b09514-c000.snappy.parquet
-rw-r--r--   1 root supergroup        919 2021-07-30 23:45 /tmp/all_events/part-00000-782a6ce0-5dfd-48b4-8920-de191c629d94-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3379 2021-07-30 23:46 /tmp/all_events/part-00000-c974a689-d787-4602-8a78-9425556f1fbe-c000.snappy.parquet
(base) jupyter@w205-01:~/w205/project-3-kumar-jot-patrick$ docker-compose exec cloudera hadoop fs -ls /tmp/all_events
Found 8 items
drwxr-xr-x   - root supergroup          0 2021-07-30 23:47 /tmp/all_events/_spark_metadata
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-441e730c-9e79-4dc3-ab58-187487200b63-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3381 2021-07-30 23:46 /tmp/all_events/part-00000-48ab1ff9-7b73-4c0c-848b-4618e5f912b9-c000.snappy.parquet
-rw-r--r--   1 root supergroup        919 2021-07-30 23:46 /tmp/all_events/part-00000-49d98d0b-eff6-4ca6-a744-62f63e5fdff7-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3382 2021-07-30 23:47 /tmp/all_events/part-00000-5989a8bb-a2af-4f31-a725-44c2ce0870f3-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3378 2021-07-30 23:46 /tmp/all_events/part-00000-7452b8f0-85fb-42a0-a5fe-5371b5b09514-c000.snappy.parquet
-rw-r--r--   1 root supergroup        919 2021-07-30 23:45 /tmp/all_events/part-00000-782a6ce0-5dfd-48b4-8920-de191c629d94-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3379 2021-07-30 23:46 /tmp/all_events/part-00000-c974a689-d787-4602-8a78-9425556f1fbe-c000.snappy.parquet

```