<p align="center">
<img src ="https://raw.githubusercontent.com/microsoft/azuredatastudio/master/src/sql/media/microsoft_logo_gray.svg?sanitize=true" width="250" align="center">
</p>

# **Twitter Streaming with SQL Server & Spark**

In this notebook, we will go through the process of using Spark to stream tweets from the Twitter API, and then stream the resulting data into the SQL Server data pool. Once the data is in the data pool, we will perform queries on it using T-SQL or the Spark-SQL connector. 

## **Steps**
1. [Create a Twitter Developer Account](https://developer.twitter.com/en/apply-for-access.html).
2. Setup
    1. Create 'TwitterData' database.
    2. Create an External Data Source 'TweetsDataSource'.
    3. Create an External Table 'Tweets'.
    4. Change kernel from "SQL" to "Spark | Scala".
    5. Import packages.
    6. Enter required parameters.
3. Define and create a TwitterStream object.
4. Start the TwitterStream.
5. Validate streaming data.
6. Stream data into SQL Server data pool.
7. Query the data from the data pool external table using T-SQL or the Spark-SQL connector.
8. Stop the TwitterStream

## **1. Create a Twitter Developer Account**
[Create a Twitter Developer Account](https://developer.twitter.com/en/apply-for-access.html) and enter the credentials in the Setup section below. These credentials will be used to authenticate the application with Twitter and allow you to stream data from the platform.

## **2. Setup**
1. Create a database in the SQL Server master instance named 'TwitterData'.
2. Create an External Data Source to the Data Pool named 'TweetsDataSource'.
3. Create an External Table in the Data Pool named 'Tweets'.
4. Change the Kernel from "SQL" to "Spark | Scala".
5. Import Java packages.
6. Specify setup parameters

### **2.1 Create TwitterData Database and Retrieve Hostname**

In [0]:
USE master
IF EXISTS(select * from sys.databases where name='TwitterData')
DROP DATABASE TwitterData;
GO
CREATE DATABASE TwitterData;
GO
USE TwitterData;
GO

Use the following cell to check your server name. If it returns ```master-0```, your hostname will be ```master-0.master-svc```. You will define this value in the Parameters section.

In [0]:
SELECT @@Servername;

### **2.2 Create External Data Source 'TweetsDataSource'**

In [0]:
USE TwitterData
GO

IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'TweetsDataSource')
  CREATE EXTERNAL DATA SOURCE TweetsDataSource
  WITH (LOCATION = 'sqldatapool://controller-svc/default');

### **2.3 Create External Table 'Tweets'**

In [0]:
IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'Tweets')
   CREATE EXTERNAL TABLE [Tweets]
   ("screen_name" NVARCHAR(MAX), "createdAt" DATETIME , "num_followers" BIGINT, "text" NVARCHAR(MAX))
   WITH
   (
      DATA_SOURCE = TweetsDataSource,
      DISTRIBUTION = ROUND_ROBIN
   );

### **2.4 Change the kernel from "SQL" to "Spark | Scala"**
At the top of the editor, click the Kernel dropdown menu and change the kernel from "SQL" to "Spark | Scala". This will update the notebook language, and allow you to proceed with the next steps.

### **2.5 Import packages**

In [0]:
import java.io.{BufferedReader, File, FileNotFoundException, InputStream, InputStreamReader}
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.util.Base64
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
import scala.collection.JavaConverters._
import org.apache.commons.io.IOUtils
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import java.io.PrintWriter
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}

### **2.6 Parameters**
Enter the required parameters for the Spark streaming job to connect to SQL Server.

In this example, the connection is made from Spark to the SQL Server master instance using the internal DNS name (Ex: master-0.master-svc) and port (1433).  Alternatively, and especially if you are using a highly available Always On Availability Group, you can connect to the Kubernetes service that exposes the primary node of the Always On Availability Group.

####  Parameters needed to create Twitter stream
- Twitter API authentication keys

#### Parameters needed for Spark-SQL connector:
- user
- password

#### Optional parameters:
- hostname
- port 
- CSV schema
- Source directory location
- Set path in hdfs to store tweets
- Set saving interval for file creation 
- Set Twitter filters


In [0]:
// Twitter app autnentication keys
val consumerKey = ""
val consumerSecret = ""
val accessToken = ""
val accessTokenSecret = ""

// Provide username/password to SQL Server master instance
val user = "user"
val password = "password"

// Spark-SQL connector parameters
val hostname = "master-0.master-svc"
val port = 1433
val database = "TwitterData"
val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
val dbtable = "Tweets"
val datasource_name = "TweetsDataSource"

// Twitter stream object parameters
val filters = Array("tennis", "nadal", "federer", "murray", "djokovic")
val path = "/user/twitter/"
val savingInterval = 2000

## **3. Define and Create TwitterStream object**

In [0]:
class TwitterStream(
  consumerKey: String,
  consumerSecret: String,
  accessToken: String,
  accessTokenSecret: String,
  path: String,
  savingInterval: Long,
  filters: Array[String]) {
  
  private val threadName = "tweet-downloader"
  
  {
    val hasActiveStream = Thread.getAllStackTraces().keySet().asScala.map(_.getName).contains(threadName)
    if (hasActiveStream) {
      throw new RuntimeException(
        "There is already an active stream that writes tweets to the configured path. " +
        "Please stop the existing stream first (using twitterStream.stop()).")
    }
  }
  
  @volatile private var thread: Thread = null
  @volatile private var isStopped = false
  @volatile var isDownloading = false
  @volatile var exception: Throwable = null

  private var httpclient: CloseableHttpClient = null
  private var input: InputStream = null
  private var httpGet: HttpGet = null
  
  private def encode(string: String): String = {
    URLEncoder.encode(string, StandardCharsets.UTF_8.name)
  }

  def start(): Unit = synchronized {
    isDownloading = false
    isStopped = false
    thread = new Thread(threadName) {
      override def run(): Unit = {
        httpclient = HttpClients.createDefault()
        try {
          requestStream(httpclient)
        } catch {
          case e: Throwable => exception = e
        } finally {
          //TwitterStream.this.stop()
        }
      }
    }
    thread.start()
  }

  private def requestStream(httpclient: CloseableHttpClient): Unit = {
    val url = "https://stream.twitter.com/1.1/statuses/filter.json"
    val timestamp = System.currentTimeMillis / 1000
    val nonce = timestamp + scala.util.Random.nextInt
    val oauthNonce = nonce.toString
    val oauthTimestamp = timestamp.toString

    val oauthHeaderParams = List(
      "oauth_consumer_key" -> encode(consumerKey),
      "oauth_signature_method" -> encode("HMAC-SHA1"),
      "oauth_timestamp" -> encode(oauthTimestamp),
      "oauth_nonce" -> encode(oauthNonce),
      "oauth_token" -> encode(accessToken),
      "oauth_version" -> "1.0"
    )
    val requestParams = List(
      "track" -> encode(filters.mkString(","))
    )

    val parameters = (oauthHeaderParams ++ requestParams).sortBy(_._1).map(pair => s"""${pair._1}=${pair._2}""").mkString("&")
    val base = s"GET&${encode(url)}&${encode(parameters)}"
    val oauthBaseString: String = base.toString
    val signature = generateSignature(oauthBaseString)
    val oauthFinalHeaderParams = oauthHeaderParams ::: List("oauth_signature" -> encode(signature))
    val authHeader = "OAuth " + ((oauthFinalHeaderParams.sortBy(_._1).map(pair => s"""${pair._1}="${pair._2}"""")).mkString(", "))

    httpGet = new HttpGet(s"https://stream.twitter.com/1.1/statuses/filter.json?${requestParams.map(pair => s"""${pair._1}=${pair._2}""").mkString("&")}")
    httpGet.addHeader("Authorization", authHeader)
    println("Downloading tweets!")
    val response = httpclient.execute(httpGet)
    val entity = response.getEntity()
    input = entity.getContent()
    if (response.getStatusLine.getStatusCode != 200) {
      throw new RuntimeException(IOUtils.toString(input, StandardCharsets.UTF_8))
    }
    isDownloading = true
    val reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
    var line: String = null
    var lineno = 1
    line = reader.readLine()
    var lastSavingTime = System.currentTimeMillis()
    val s = new StringBuilder()
   
    val conf = new Configuration()
    val fs= FileSystem.get(conf)
                                                                      
    while (line != null && !isStopped) {
      lineno += 1
      line = reader.readLine()
      s.append(line + "\n")
      val now = System.currentTimeMillis()
      if (now - lastSavingTime >= savingInterval) {
          
         val df = spark.read.json(spark.sparkContext.parallelize(Seq(s.toString)))
         df.write.json(path + now.toString)
          
        lastSavingTime = now
        s.clear()
      }
    }
  }

  private def generateSignature(data: String): String = {
    val mac = Mac.getInstance("HmacSHA1")
    val oauthSignature = encode(consumerSecret) + "&" + encode(accessTokenSecret)
    val spec = new SecretKeySpec(oauthSignature.getBytes, "HmacSHA1")
    mac.init(spec)
    val byteHMAC = mac.doFinal(data.getBytes)
    return Base64.getEncoder.encodeToString(byteHMAC)
  }

  def stop(): Unit = synchronized {
    isStopped = true
    isDownloading = false
    try {
      if (httpGet != null) {
        httpGet.abort()
        httpGet = null
      }
      if (input != null) {
        input.close()
        input = null
      }
      if (httpclient != null) {
        httpclient.close()
        httpclient = null
      }
      if (thread != null) {
        thread.interrupt()
        thread = null
      }
    } catch {
      case _: Throwable =>
    }
  }
}
println("class defined")

In [0]:
val twitterStream = new TwitterStream(consumerKey, consumerSecret, accessToken, accessTokenSecret, path, savingInterval, filters)

## **4. Start TwitterStream**

In [0]:
twitterStream.start()

if (twitterStream.exception != null) { throw twitterStream.exception }

## **5. Validate streaming data**

Refresh the /user/twitter directory in HDFS and you should see some directories containing data files with the Tweet data.

## **6. Stream data into SQL Server data pool**

Now, we can start another job to stream the incoming data into the SQL Server data pool. We will first create the Tweet Dataframe, and then write the data from the dataframe to an external table over the data pool using the Spark-SQL connector.

### **Create Tweet Dataframe**

In [0]:
val tweets = spark.read.json(path + "*")
val tweets_schema = tweets.schema

val tweetStream = spark.readStream.
|schema(tweets_schema).
|json(path + "*").
|filter($"lang" === "en").
|withColumn("screen_name", $"user.screen_name").
|withColumn("num_followers", $"user.followers_count").
|withColumn("createdAt", from_utc_timestamp(from_unixtime(unix_timestamp($"created_at", "EEE MMM dd HH:mm:ss ZZZZ yyyy")),"EST")).
|select("screen_name","createdAt","num_followers", "text") 

### **Write data to an external table using the Spark-SQL Connector**

In [0]:
val query = tweetStream.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
                batchDF.write
                    .format("com.microsoft.sqlserver.jdbc.spark")
                    .mode("append")
                    .option("url", url)
                    .option("dbtable", dbtable)
                    .option("user", user)
                    .option("password", password)
                    .option("dataPoolDataSource",datasource_name).save()
               }.start()
query.processAllAvailable()
//query.awaitTermination(40000)

## **7. Query the data from the data pool external table using T-SQL or the Spark-SQL connector**
Now, you are streaming data from the source HDFS directory to the data pool table. An external table has been created in the targeted database specified above. You can view the table in the explorer tree and query it using T-SQL. 

If you want to view the current count of records in the external table, use the code below that uses the Spark-SQL connector to query data from SQL Server into a data frame.

You can continue to add files to the /user/twitter directory to see that the Spark-SQL connector automatically picks up new records and adds them to the data pool table.

In [0]:
def df_read(dbtable: String,
                url: String,
                dataPoolDataSource: String=""): DataFrame = {
                spark.read
                        .format("com.microsoft.sqlserver.jdbc.spark")
                        .option("url", url)
                        .option("dbtable", dbtable)
                        .option("user", user)
                        .option("password", password)
                        .option("dataPoolDataSource", datasource_name)
                        .load()
                }

In [0]:
val new_df = df_read(dbtable, url, dataPoolDataSource=datasource_name)
new_df.count

## **8. Stop the TwitterStream**

In [0]:
twitterStream.stop()