# TwitterTrends-3-KafkaToMongoDB

## Importing libraries

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.2.0`
import $ivy.`org.mongodb.spark::mongo-spark-connector:2.2.0`

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                                               [39m

In [2]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession

import com.mongodb.spark._

import org.apache.log4j.PropertyConfigurator

[32mimport [39m[36morg.apache.spark.sql.types._
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession

[39m
[32mimport [39m[36mcom.mongodb.spark._

[39m
[32mimport [39m[36morg.apache.log4j.PropertyConfigurator[39m

##  Creating Spark Session

*Note: As stated in [readme](https://github.com/rvilla87/Big-Data#some-things-to-consider), we will change the log lv to WARN.*

In [3]:
PropertyConfigurator.configure("C:/spark/conf/log4j.properties") // load spark's log4j configuration (set to WARN)

In [4]:
val spark = SparkSession.builder()
  .appName("TwitterKafkaToMongoDB")
  .master("local[*]")
  .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/TwitterTrends.trends")
  .getOrCreate()

[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@4ce4d369

##  [Spark Structure Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

As stated in [TwitterTrends-2-FileToKafka](TwitterTrends-2-FileToKafka.ipynb) we have to define a streaming Dataframe and `StreamingQuery`in order to read from Kafka and stream to [MongoDB](https://www.mongodb.com/).

In order to create the streaming Dataframe we can use [Spark Connector](https://jira.mongodb.org/browse/SPARK) but at the moment [Spark Connector doesn't support structured streaming](https://jira.mongodb.org/browse/SPARK-85). So in this example we won't stream the data with `readStream` method, instead we will load all the files with `read` method.

In [5]:
val kafkaSchema = new StructType().add("key", "String").add("value", "String")

[36mkafkaSchema[39m: [32mStructType[39m = [33mStructType[39m(StructField(key,StringType,true), StructField(value,StringType,true))

In [TwitterTrends-1-TrendsToFile](TwitterTrends-1-TrendsToFile.ipynb) we set so many fields and we want to copy the same structure into MongoDB. In order to do so we have to cast all the required fields:

In [8]:
//TODO: Implement Structured Streaming into MongoDB with Spark Connector when supported
    // readstream from Kafka
    // writestream to MongoDB

// Workaround: Using Dataframes' read method (not streaming)
val fileDF = spark
  .read
  .schema(kafkaSchema)
  .option("sep", ";")
  .option("dateFormat", "dd/MM/yyyy")
  .csv("../datasets/trendFiles/trends_*.csv")
  .selectExpr("CAST(key as String)",
             "split(value, '[|]')[0] as country",
             "split(value, '[|]')[1] as city",
              "CAST(split(value, '[|]')[2] as Int) as woeid",
              "CAST(split(value, '[|]')[3] as Double) as lon",
              "CAST(split(value, '[|]')[4] as Double) as lat",
              "TO_TIMESTAMP(CONCAT(split(value, '[|]')[6], ' ', split(value, '[|]')[5]), 'dd/MM/yyyy HH:mm:ss') as date",
              "split(value, '[|]')[7] as trends"                 
                 )

[36mfileDF[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [key: string, country: string ... 6 more fields]

We check that we have the desired schema:

In [8]:
fileDF.printSchema

root
 |-- key: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- woeid: integer (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- trends: string (nullable = true)



### Insert documents from dataframes into MongoDB

Finally we just hace to add every message from Kafka to MongoDB.

*Note: Before executing next statement make sure you have [started MongoDB Server](https://github.com/rvilla87/Big-Data#starting-mongodb-server).*

In [9]:
MongoSpark.save(fileDF.write.option("collection", "trends").mode("append"))