In [4]:
from pyspark.sql.types import DoubleType

input_location = 'abfss://trade@gksynapsestorage.dfs.core.windows.net/FUTURES_TRADES.txt'

df = spark.read.format('csv').option("header", False).option("sep", ",").load(input_location, format='csv')
display(df.limit(10))
df = df.withColumnRenamed('_c0', 'Date') \
.withColumnRenamed('_c1', 'Time') \
.withColumnRenamed('_c2', 'Value') \
.withColumnRenamed('_c3', 'Volume')
df.show(5)

df = df.dropDuplicates(["Date","Time"])
df.show(5)

from pyspark.sql.functions import when
from pyspark.sql import functions as F
df = df.withColumn('JoinedColumn',
F.concat(F.col('Date'),F.lit(' '), F.col('Time')))
df = df.withColumn("Timestamp", F.to_timestamp("JoinedColumn", "dd/MM/yyyy HH:mm:ss"))
df = df.withColumn("Name", when(F.isnull('Value'), None).otherwise('ADA'))
df = df.withColumn("Value", df["Value"].cast(DoubleType()))
df.printSchema()
df.show(5)

min_df = df.groupBy("Name", F.window("Timestamp", "5 minutes")).min("Value")
min_df = min_df.sort('window','Name')
min_df.show(5)

max_df = df.groupBy("Name", F.window("Timestamp", "5 minutes")).max("Value")
max_df = max_df.sort('window','Name')
max_df.show(5)

avg_df = df.groupBy("Name", F.window("Timestamp", "5 minutes")).avg("Value")
avg_df = avg_df.sort('window','Name')
avg_df.show(5)

merged_df = min_df.join(max_df, on=['Name', 'window'], how='left_outer')
merged_df = merged_df.join(avg_df, on=['Name', 'window'], how='left_outer')
merged_df = merged_df.sort('window','Name')
merged_df.show(5)

merged_df = merged_df.withColumn('Value Change in %', when(F.isnull('Name'),None).otherwise((merged_df['max(Value)'] - merged_df['min(Value)'])/ 100))
merged_df.show(5)

# save to db

#tableDf = merged_df.selectExpr("Name as name", "window as timestamp","min(Value) as Min","max(Value) as Max"," avg(Value) Avg")
# tableDf.show(10)


# from functools import reduce

StatementMeta(workshopspark1, 0, 4, Finished, Available)

+----------+--------+-----+------+
|      Date|    Time|Value|Volume|
+----------+--------+-----+------+
|06/10/2010|08:00:00|  106|348746|
|06/10/2010|08:00:00|  105|331580|
|06/10/2010|08:00:00|105.5|352352|
|06/10/2010|08:00:00|106.5|347253|
|06/10/2010|08:00:00|  105|330164|
+----------+--------+-----+------+
only showing top 5 rows

+----------+--------+-----+------+
|      Date|    Time|Value|Volume|
+----------+--------+-----+------+
|06/10/2010|08:26:29|106.5|346184|
|06/10/2010|08:27:13|  106|345722|
|06/10/2010|09:05:02|  111|351804|
|06/10/2010|09:05:05|109.5|347778|
|06/10/2010|09:12:50|113.5|343770|
+----------+--------+-----+------+
only showing top 5 rows

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- Volume: string (nullable = true)
 |-- JoinedColumn: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Name: string (nullable = true)

+----------+--------+-----+------+---------

SynapseWidget(Synapse.DataFrame, c40f1c19-f1c4-4282-8abd-5c1298c337df)

In [5]:
%%pyspark 

df = (merged_df.withColumnRenamed("Name", "name")
.withColumnRenamed("window", "timestamp")
.withColumnRenamed("min(Value)","Min")
.withColumnRenamed("max(Value)","Max")
.withColumnRenamed("avg(Value)","Avg")
.withColumnRenamed("Value Change in %","Change")


)
df.show()
df.write.mode("overwrite").saveAsTable("default.t2")



StatementMeta(workshopspark1, 0, 5, Finished, Available)

+----+--------------------+-----+-----+------------------+------+
|name|           timestamp|  Min|  Max|               Avg|Change|
+----+--------------------+-----+-----+------------------+------+
| ADA|[2010-10-06 08:00...|101.5|107.5|104.59722222222223|  0.06|
| ADA|[2010-10-06 08:05...| 98.0|106.0|100.97058823529412|  0.08|
| ADA|[2010-10-06 08:10...| 99.5|109.0|104.49253731343283| 0.095|
| ADA|[2010-10-06 08:15...|106.0|111.5|107.96428571428571| 0.055|
| ADA|[2010-10-06 08:20...|105.5|110.5| 108.5909090909091|  0.05|
| ADA|[2010-10-06 08:25...|105.5|110.5|107.41666666666667|  0.05|
| ADA|[2010-10-06 08:30...|107.0|112.0|110.58823529411765|  0.05|
| ADA|[2010-10-06 08:35...|102.0|110.0|        106.109375|  0.08|
| ADA|[2010-10-06 08:40...| 99.5|106.5|101.92537313432835|  0.07|
| ADA|[2010-10-06 08:45...|102.5|106.0|            104.56| 0.035|
| ADA|[2010-10-06 08:50...|105.0|110.0|107.28358208955224|  0.05|
| ADA|[2010-10-06 08:55...|106.0|112.5|109.52307692307693| 0.065|
| ADA|[201

In [6]:
%%pyspark

myNewPythonDataFrame = spark.sql("SELECT * FROM default.t2")
display(myNewPythonDataFrame)

StatementMeta(workshopspark1, 0, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, c3d81418-7297-4f6b-83ec-cb31f306516a)

In [7]:
%%spark
val df = spark.read.sqlanalytics("workshoppool1.dbo.departments") 
df.show()

StatementMeta(workshopspark1, 0, 8, Finished, Available)

df: org.apache.spark.sql.DataFrame = [name: string, category: string]
+---------+--------+
|     name|category|
+---------+--------+
|  Finance|   extra|
|Marketing|   addon|
|    Sales| special|
+---------+--------+



In [9]:
%%pyspark 
# NOT WORKING
df = spark.read.sqlanalytics("workshoppool1.dbo.departments") 
df.show(2)

StatementMeta(workshopspark1, 0, 10, Finished, Available)

AnalysisException: Table or view not found: `dbo`.`departments`; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation `dbo`.`departments`
