This sample shows the integration between Infinispan and Spark including RDD operations, writing from RDDs, Spark SQL and Streaming.
Build the project
Make sure project is built to generate the job fat jar. From the project root, type:
Launch a cluster
docker-compose are installed.
To launch the Spark standalone cluster and the Infinispan cluster, run the script:
Obtain Tweeter OAuth credentials
Since Spark twitter connector requires authentication, credentials need to be obtained from the twitter account holder by following instructions on https://dev.twitter.com/oauth/overview/application-owner-access-tokens
1. Populate the cache
The first job
StreamConsumer will create a Twitter DStream, apply a transformation and save them to Infinispan.
To run the job:
./run-job.sh -m org.infinispan.spark.examples.twitter.StreamConsumerJava -c <twitter.consumerKey> -k <twitter.consumerSecret> -t <twitter.accessToken> -s <twitter.accessTokenSecret>
org.infinispan.spark.examples.twitter.StreamConsumerScala to run the Scala version
After a few seconds, the number of tweets inserted in the cache plus the last tweet stored will be printed.
3262 tweets inserted in the cache Last tweet:RT @Teeeeeago: Gostei de um vídeo @YouTube de @chefejinbe http://t.co/dfEpYK1QUm #Bejiin - ResourcePacks #4
To interrupt the job, hit CTRL+C
2. Counting relevant words
WordCount job will run a map reduce job on all data currently in the cache, counting words in the Tweets discarding common used ones
and will print the top 20 words found. Output will be:
'love' appears 96 times 'July' appears 66 times 'time' appears 62 times
To run the job:
./run-job.sh -m org.infinispan.spark.examples.twitter.WordCountJava
org.infinispan.spark.examples.twitter.WordCountScala for the Scala version.
3. Aggregation using SQL
SQLAggregation job uses SQL to group the tweets by country and print the top 20 countries:
SELECT country, count(*) as c from tweets WHERE country != 'N/A' GROUP BY country ORDER BY c desc
./run-job.sh -m org.infinispan.spark.examples.twitter.SQLAggregationJava
org.infinispan.spark.examples.twitter.SQLAggregationScala for the Scala version.
[United States,21] [Indonesia,21] [Türkiye,18] [日本,17] [Brasil,17] [United Kingdom,17]
4. Listen to events from the cache
StreamProducer job extends the
StreamConsumer and also create a InfinispanInputDStream based on the data inserted
in the cache. It prints the summary by country of the tweets inserted in the cache, by using a sliding window of 60 seconds
./run-job.sh -m org.infinispan.spark.examples.twitter.StreamProducerJava -c <twitter.consumerKey> -k <twitter.consumerSecret> -t <twitter.accessToken> -s <twitter.accessTokenSecret>
org.infinispan.spark.examples.twitter.StreamProducerScala for the Scala version.
---------- 1437480561000 ms ---------- [United Kingdom,5] [ประเทศไทย,5] [日本,5] [Argentina,4] [Republika ng Pilipinas,4] [Indonesia,3] [Brasil,3] [United States,3]
5. Changing code
Source code for the samples can be changed without a docker container restart, but a
./sbt examplesTwitter/assembly is needed.
6. Stop containers
When you start the
StreamConsumer, you might see no tweets being consumed, e.g.
Obtaining Spark master Submitting the job Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/11/25 20:05:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 0 tweets inserted in the cache Last tweet:<no tweets received so far> 0 tweets inserted in the cache Last tweet:<no tweets received so far> 0 tweets inserted in the cache Last tweet:<no tweets received so far> 0 tweets inserted in the cache Last tweet:<no tweets received so far> ...
There are several causes for this:
First, verify that you have passed in the right credentials when executing
Even if you pass in the right credentials, you might get see one of the workers reporting:
17/11/25 20:07:07 WARN ClientBase: Hosebird-client Error connecting w/ status code - 401, reason - Authorization Required
The error might be caused due to the clock of the Docker daemon being out of sync. When a request is sent to Twitter, the current timestamp is sent. If this timestamp is too far in the past, Twitter will reject the request (see here for more info). The Docker daemon's clock can get out of sync when the computer has been sent to sleep. So to fix the problem, simply restart the Docker daemon.