-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyspark_socket_streaming.py
50 lines (42 loc) · 1.8 KB
/
pyspark_socket_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
import socket
import sys
server = socket.socket()
host = socket.gethostbyname(socket.gethostname())
try:
spark = SparkSession.builder.appName('socket_streaming').master('local[*]').getOrCreate()
try:
socket_source_stream = spark \
.readStream \
.format('socket') \
.option('host', f'{host}') \
.option('port', '5011') \
.option('includeTimestamp', True).load()
except Exception as err:
print(f'Error establishing spark socket stream : \n', sys.exc_info())
else:
print('\n----- Is spark streaming: ', socket_source_stream.isStreaming, '\n')
socket_source_stream.printSchema()
try:
stream_words_df = socket_source_stream.select(explode(split(socket_source_stream.value, " ")).alias("words"))
stream_words_count_df = stream_words_df.groupBy("words").count()
except Exception as err:
print('Error in transformation:\n', err, sys.exc_info())
else:
socket_sink_stream = stream_words_count_df.writeStream.queryName('sink_socket_streaming')\
.format('console')\
.outputMode('update')\
.start(truncate=False)
socket_sink_stream.awaitTermination(55) #.trigger(processingTime='1 second')\ "I removed this since processing power is hardware-dependant, since we ain't streaming much adding it neccesarily won't break our code, just make our terminal a little dirty"
finally:
pass
finally:
print('\nStreaming Over.\n\n')
except Exception as err:
print(f'\Error creating SparkSession: \n\n', err, sys.exc_info())
else:
print('All went good.\n')
finally:
spark.stop()
print('SparkSession stopped!')