<a href="https://colab.research.google.com/github/tsaw8/Thinkful_Project/blob/master/Foursquare_Streaming_Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Running Your Queries In Spark
You need to take the data from Foursquare and perform your analysis based on the question you chose.

In our example below, we do the following:

We read the files that our Foursquare client generates from the drive.
For each city, we get the trending venue categories and the number of people currently being there.
We add up the numbers for the same categories.
You can extend this into a web dashboard, or plots inside this notebook if you choose.

In [1]:

# Install Java an Apache Spark 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

# Set location for installation:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

!pip install -q findspark
!pip install pyspark

# mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
# Import relevant modules 
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

In [0]:
# Deletes all the log files inside the foursquare_logs directory
import shutil
folder = '/content/gdrive/My Drive/Colab Datasets/foursquare_logs'
for the_file in os.listdir(folder):
    file_path = os.path.join(folder, the_file)
    try:
        if os.path.isfile(file_path):
            os.unlink(file_path)
        #elif os.path.isdir(file_path): shutil.rmtree(file_path)
    except Exception as e:
        print(e)

In [0]:
# Helper function to store the affrefate number of each category
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

In [0]:
# create spark configuration
conf = SparkConf()
conf.setAppName("FoursquareStreamApp")

# create spark context with the above configuration
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("ERROR")

# create the Streaming Context from the above spark context with 
# interval size 10 seconds
ssc = StreamingContext(sc,10)

# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_FoursquareApp")

# read data from drive
dataStream = ssc.textFileStream('/content/gdrive/My Drive/Colab Datasets/foursquare_logs')

### Finally, we implement our primary workflow.
After the implementation of our workflow, we begin the streaming with ssc.start(). The query stays open until we terminate it (ssc.awaitTermination()).

In [6]:
visitor_counts = dataStream.map(lambda x: (x.split(",")[0], int(x.split(",")[1]))).reduceByKey(lambda a, b: a + b)

runningCounts = visitor_counts.updateStateByKey(updateFunction)

runningCounts.pprint()

# start the streaming computation
ssc.start()

# wait for the streaming to finish
ssc.awaitTermination()

-------------------------------------------
Time: 2019-11-11 20:00:00
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:00:10
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:00:20
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:00:30
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:00:40
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:00:50
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:01:00
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:01:10
-------------------------------------------

-------------------------------------------
Time: 2019-11-11 20:01:20
----------

KeyboardInterrupt: ignored

In [0]:
# Quite cell
ssc.stop()