In [0]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages=com.qubole.spark/spark-sql-kinesis_2.11/1.1.3-spark_2.4 pyspark-shell'
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType, BinaryType

In [0]:
spark = SparkSession.builder \
         .master('local[*]') \
         .appName('SpeechEmotion') \
         .getOrCreate()

In [0]:
schema = StructType() \
          .add("user_id", IntegerType(), True) \
          .add("message_type", StringType(), True) \
          .add("mfcc", StringType(), True)

In [0]:
awsAccessKey = # update the access key
awsSecretKey = # update the secret key
kinesisStreamName = "audio_data"  # update the kinesis stream name
kinesisRegion = "us-east-1"

In [0]:
kinesisDF = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', kinesisStreamName) \
        .option('endpointUrl', 'https://kinesis.us-east-1.amazonaws.com')\
        .option('region', kinesisRegion) \
        .option('awsAccessKey', awsAccessKey) \
        .option('awsSecretKey', awsSecretKey) \
        .option('startingposition', 'TRIM_HORIZON')\
        .option('inferSchema', "true")\
        .load()

In [0]:
display(kinesisDF)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631088863260158764368326033458,2022-07-01T06:26:09.471+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631090072185978379066220216370,2022-07-01T06:26:10.143+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631091281111797993695394922546,2022-07-01T06:26:10.788+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631092490037617608393289105458,2022-07-01T06:26:11.455+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631093698963437223091183288370,2022-07-01T06:26:12.109+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631094907889256837720357994546,2022-07-01T06:26:12.742+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631096116815076452418252177458,2022-07-01T06:26:13.374+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631097325740896067047426883634,2022-07-01T06:26:13.988+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631098534666715681745321066546,2022-07-01T06:26:14.664+0000
user_id,eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2VfdHlwZSI6ICJhdWRpbyIsICJtZmNjIjogWyItMTEzMS4zNzEiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4wIiwgIjAuMCIsICIwLjAiLCAiMC4= (truncated),audio_data,shardId-000000000003,49630753398494293970312341631099743592535296374495772722,2022-07-01T06:26:15.389+0000


In [0]:
#extract data from the payload and use transformation to do your analytics
audioDataDF = kinesisDF \
  .selectExpr("cast (data as STRING) jsonData") \
  .select(from_json("jsonData", schema).alias("audio_data")) \
  .select("audio_data.mfcc")

In [0]:
display(audioDataDF)

mfcc
"{""0"":-721.9397583008,""1"":99.3131561279,""2"":-14.5079021454,""3"":2.5251142979,""4"":17.4713020325,""5"":-4.6686763763,""6"":7.7476963997,""7"":9.755074501,""8"":-13.3225402832,""9"":-8.131439209,""10"":0.7107235789,""11"":-8.3260536194,""12"":-5.3064522743,""13"":0.0687498525,""14"":-4.0061798096,""15"":-0.8434225321,""16"":2.2199401855,""17"":-1.4296724796,""18"":-2.4533205032,""19"":-5.8325409889,""20"":118.1642150879,""21"":84.5949478149,""22"":22.0307674408,""23"":19.710477829,""24"":16.6391906738,""25"":15.5559663773,""26"":13.0285234451,""27"":16.8610477448,""28"":17.4025001526,""29"":15.6740179062,""30"":9.4060287476,""31"":10.0458164215,""32"":9.6847629547,""33"":7.631456852,""34"":8.2698793411,""35"":7.2111349106,""36"":8.8027505875,""37"":7.0534267426,""38"":7.3397312164,""39"":9.3369998932}"
"{""0"":-691.9147949219,""1"":93.5046768188,""2"":-25.7549495697,""3"":6.8084897995,""4"":18.2052459717,""5"":-13.3437414169,""6"":6.5107331276,""7"":11.6757516861,""8"":-12.4186096191,""9"":-3.0736691952,""10"":4.0990056992,""11"":-6.0062799454,""12"":0.9667653441,""13"":2.3489747047,""14"":-7.1542582512,""15"":-3.5149872303,""16"":-1.8059973717,""17"":-4.3008346558,""18"":0.699177146,""19"":-1.1640911102,""20"":133.3751373291,""21"":85.7208633423,""22"":36.245300293,""23"":16.7507305145,""24"":17.4436836243,""25"":21.0895442963,""26"":13.8435029984,""27"":18.4682617188,""28"":14.8527011871,""29"":12.894859314,""30"":6.7428979874,""31"":8.4621458054,""32"":7.3358154297,""33"":6.7116780281,""34"":10.5756587982,""35"":7.438000679,""36"":5.7263112068,""37"":5.9683732986,""38"":6.5986094475,""39"":4.7803106308}"
"{""0"":-706.8563842773,""1"":102.5111312866,""2"":-27.6363830566,""3"":-1.1227332354,""4"":23.9937896729,""5"":-13.4869861603,""6"":-16.838312149,""7"":-2.6999747753,""8"":-7.2093710899,""9"":-2.8761229515,""10"":0.3282948136,""11"":-7.4888572693,""12"":-6.6113257408,""13"":-2.2905154228,""14"":-2.9765663147,""15"":-5.2225737572,""16"":-9.0989179611,""17"":-8.9049215317,""18"":-7.0519504547,""19"":-8.1444444656,""20"":133.3222503662,""21"":88.3667984009,""22"":32.8388023376,""23"":19.8206558228,""24"":24.1876163483,""25"":18.39884758,""26"":20.1578388214,""27"":12.7191791534,""28"":10.7838039398,""29"":7.1219940186,""30"":6.6892585754,""31"":12.5741424561,""32"":9.3054170609,""33"":7.4556617737,""34"":11.0134143829,""35"":8.2281312943,""36"":10.8148384094,""37"":10.018743515,""38"":9.3567533493,""39"":12.1178712845}"
"{""0"":-808.8944702148,""1"":134.5643920898,""2"":-36.9766311646,""3"":4.307533741,""4"":24.5335636139,""5"":-14.4832172394,""6"":7.9071140289,""7"":13.105383873,""8"":-10.4669055939,""9"":4.0771746635,""10"":4.581433773,""11"":-10.3755893707,""12"":0.9408499002,""13"":-0.5963837504,""14"":-9.0200386047,""15"":1.9116352797,""16"":-0.9627247453,""17"":-6.9399371147,""18"":1.910930872,""19"":-2.437795639,""20"":96.8862762451,""21"":71.5512237549,""22"":16.2785720825,""23"":11.2788181305,""24"":12.2801780701,""25"":9.7745189667,""26"":7.2526760101,""27"":6.5258460045,""28"":7.9452567101,""29"":8.8959226608,""30"":5.9751353264,""31"":8.6884784698,""32"":6.1257829666,""33"":5.7458410263,""34"":7.0737805367,""35"":4.6630949974,""36"":5.65421772,""37"":5.6114635468,""38"":4.5561156273,""39"":5.8695478439}"
"{""0"":-641.8674316406,""1"":107.0280990601,""2"":-29.6452598572,""3"":-8.3094997406,""4"":17.1004505157,""5"":-8.4332609177,""6"":-2.1830298901,""7"":4.1114554405,""8"":-8.2135076523,""9"":0.6839445829,""10"":4.7232780457,""11"":-7.0124044418,""12"":-3.2195308208,""13"":1.0340108871,""14"":-6.6466159821,""15"":-7.1316347122,""16"":-2.6455771923,""17"":-1.2581278086,""18"":-2.05966115,""19"":-5.3919324875,""20"":150.4940795898,""21"":100.6359024048,""22"":32.5385742188,""23"":11.4550237656,""24"":18.2955722809,""25"":15.3820466995,""26"":7.3537712097,""27"":6.7073869705,""28"":11.8087921143,""29"":6.6877799034,""30"":6.4382367134,""31"":10.4977560043,""32"":7.9048995972,""33"":4.5365891457,""34"":7.5894508362,""35"":11.1630764008,""36"":5.2693510056,""37"":7.3707509041,""38"":6.5478401184,""39"":11.0147123337}"
"{""0"":-707.7840576172,""1"":125.9795455933,""2"":-35.5319595337,""3"":2.0589425564,""4"":22.0095787048,""5"":-17.5056285858,""6"":3.8477692604,""7"":15.4710121155,""8"":-7.9470262527,""9"":1.7306491137,""10"":5.3952512741,""11"":-9.9671239853,""12"":-1.9220577478,""13"":1.6645934582,""14"":-6.3722701073,""15"":-0.1139035746,""16"":0.0106325736,""17"":-5.2562093735,""18"":0.2418410629,""19"":-1.9706650972,""20"":144.0324554443,""21"":96.3877029419,""22"":26.5238037109,""23"":11.1425256729,""24"":16.4849033356,""25"":15.782746315,""26"":6.0540437698,""27"":13.3269767761,""28"":6.6677651405,""29"":4.3559675217,""30"":5.6895132065,""31"":8.0968914032,""32"":4.9939255714,""33"":4.0494823456,""34"":5.4650645256,""35"":4.588183403,""36"":3.4150042534,""37"":4.8588685989,""38"":3.6579070091,""39"":4.7547588348}"
"{""0"":-759.7575683594,""1"":105.5569610596,""2"":-30.3027706146,""3"":3.8629837036,""4"":24.5069618225,""5"":-10.6748275757,""6"":0.8174120784,""7"":8.976401329,""8"":-6.8878517151,""9"":3.5602529049,""10"":5.1043715477,""11"":-9.2660112381,""12"":-0.5768398046,""13"":4.1709747314,""14"":-4.9625134468,""15"":-1.6161832809,""16"":0.4094303548,""17"":-3.5068736076,""18"":-1.8022674322,""19"":-4.5931820869,""20"":115.7611846924,""21"":78.1910552979,""22"":22.3397026062,""23"":20.3749675751,""24"":20.3103542328,""25"":17.3666000366,""26"":11.1467199326,""27"":7.6909389496,""28"":10.8845300674,""29"":9.2977142334,""30"":7.3825101852,""31"":10.0622320175,""32"":9.248292923,""33"":7.3871159554,""34"":5.4772768021,""35"":6.0747394562,""36"":6.4450416565,""37"":7.0547575951,""38"":5.188038826,""39"":6.7767305374}"
"{""0"":-740.2051391602,""1"":115.7643051147,""2"":-23.4190750122,""3"":-4.8578977585,""4"":17.6587715149,""5"":-3.6537568569,""6"":2.004938364,""7"":-2.1685092449,""8"":-12.0928039551,""9"":5.503698349,""10"":3.4238481522,""11"":-9.1971435547,""12"":4.9062857628,""13"":5.4382247925,""14"":-9.8117084503,""15"":-6.9701223373,""16"":-5.8819389343,""17"":-10.2759857178,""18"":-3.3681604862,""19"":-0.5815266371,""20"":116.6938552856,""21"":86.9770965576,""22"":22.2903766632,""23"":22.2855148315,""24"":14.421754837,""25"":7.7316637039,""26"":7.6221380234,""27"":10.0889129639,""28"":11.1651687622,""29"":7.998691082,""30"":5.4375667572,""31"":9.9981222153,""32"":9.2888793945,""33"":8.1127958298,""34"":10.0565004349,""35"":11.5510549545,""36"":12.444978714,""37"":11.0150909424,""38"":8.1762037277,""39"":4.9552378654}"
"{""0"":-749.0033569336,""1"":113.6661682129,""2"":-46.4700889587,""3"":-14.1842403412,""4"":14.3968544006,""5"":-8.8242416382,""6"":8.2549972534,""7"":8.9070415497,""8"":-7.940735817,""9"":6.5992722511,""10"":3.6691570282,""11"":-8.1776189804,""12"":7.7421364784,""13"":5.1154799461,""14"":-9.8421268463,""15"":-1.9590995312,""16"":-2.1652693748,""17"":-7.3175783157,""18"":1.8258882761,""19"":-2.4337728024,""20"":120.9316177368,""21"":78.053604126,""22"":35.2087173462,""23"":20.1577415466,""24"":8.8241415024,""25"":11.1837902069,""26"":8.9210681915,""27"":7.7632775307,""28"":7.392932415,""29"":9.5239934921,""30"":4.9293541908,""31"":6.7329759598,""32"":10.5383224487,""33"":8.9519748688,""34"":10.2578239441,""35"":6.8681335449,""36"":5.4524812698,""37"":6.574555397,""38"":5.0009813309,""39"":8.0377931595}"
"{""0"":-708.4501342773,""1"":151.2099609375,""2"":-58.9724655151,""3"":-5.8433003426,""4"":24.5331573486,""5"":-21.4623622894,""6"":3.0414836407,""7"":9.3750295639,""8"":-16.2206401825,""9"":3.8151352406,""10"":5.4878230095,""11"":-12.204082489,""12"":3.7456116676,""13"":3.8815224171,""14"":-10.7133207321,""15"":-2.2065865993,""16"":-2.6290113926,""17"":-6.9483013153,""18"":2.6637396812,""19"":-2.2806756496,""20"":157.1452331543,""21"":101.5475692749,""22"":35.2658920288,""23"":10.4424991608,""24"":14.1129026413,""25"":14.6854171753,""26"":6.1302981377,""27"":8.0051879883,""28"":11.3114814758,""29"":4.4411120415,""30"":5.1452469826,""31"":8.386554718,""32"":6.9579792023,""33"":5.4336495399,""34"":7.6774215698,""35"":6.1236348152,""36"":5.3939604759,""37"":5.2888908386,""38"":4.0817394257,""39"":6.4987859726}"


In [0]:
audioDataDF.writeStream.outputMode('append').format("csv").option(
        "path", 'dbfs:/Users/thisisvij98@gmail.com/output/data.csv').option("checkpointLocation", 'dbfs:/Users/thisisvij98@gmail.com/output/').trigger(processingTime="5 seconds").start()#.awaitTermination()

Out[93]: <pyspark.sql.streaming.StreamingQuery at 0x7fe1b51a0100>

In [0]:
display(df)

In [0]:
audioDataDF = audioDataDF.select(*(col(c).cast("float").alias(c) for c in audioDataDF.columns[:]))

In [0]:
display(audioDataDF)

mfcc
""
""
""
""
""
""
""
""
""
""


In [0]:
model=GBTClassificationModel.load('D:/study_material/pg/MLBD/GBTClassifier')
y_pred=model.transform(feature_df)       
pred=y_pred.select("prediction")
print(type(pred))
pred.show(45)    


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-2539502277203650>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0maudio_data_df[0m[0;34m.[0m[0msql[0m[0;34m([0m[0;34m"select mfcc from  audio_data_df"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mNameError[0m: name 'audio_data_df' is not defined