In [22]:
from pyspark.sql import SparkSession, SQLContext
from datetime import datetime
import pandas
from pyspark.sql.functions import pandas_udf, window, month, weekofyear, to_date

In [24]:
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-bigquery-demo') \
  .getOrCreate()

In [3]:
# Load data from BigQuery.
fin = spark.read.format('bigquery') \
  .option('table', 'proud-sweep-342309:yahoofinance.bitcoin') \
  .load()

In [31]:
fin.createOrReplaceTempView('finsql')
# Perform word count.
one_day = spark.sql("""SELECT * from
                         finsql
                         WHERE cast( Datetime  as date) == cast(NOW() as date)
                         """)

In [32]:
print( one_day.count() )
one_day.printSchema()

[Stage 18:>                                                         (0 + 1) / 1]

223
root
 |-- Datetime: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- _PARTITIONTIME: timestamp (nullable = true)
 |-- _PARTITIONDATE: date (nullable = true)



                                                                                

In [33]:
groups = one_day.groupby(window('Datetime','5 minutes')).agg({
    'Open':'last','High':'last','Low':'last','Close':'last', 'Volume':'sum'
})

In [34]:
groups.select('window')

DataFrame[window: struct<start:timestamp,end:timestamp>]

In [35]:
monthdf = groups.withColumn('MonthYear', month(groups.window.end))
monthweekdf = monthdf.withColumn('Week_Number', weekofyear(groups.window.end))

In [36]:
monthweekdf.createOrReplaceTempView('df')
# Perform word count.
five_min_agg = spark.sql('SELECT * from df LIMIT 10')

In [37]:
five_min_agg.show()

[Stage 21:>                                                         (0 + 1) / 1]

+--------------------+-----------+--------------+--------------+--------------+--------------+---------+-----------+
|              window|sum(Volume)|    last(High)|   last(Close)|    last(Open)|     last(Low)|MonthYear|Week_Number|
+--------------------+-----------+--------------+--------------+--------------+--------------+---------+-----------+
|{2022-03-08 04:05...|   65226752|   38745.65625|   38745.65625|   38745.65625|   38745.65625|        3|         10|
|{2022-03-08 04:00...|    6119424|38684.60546875|38684.60546875|38684.60546875|38684.60546875|        3|         10|
|{2022-03-08 01:05...|    8820736|  38199.671875|  38199.671875|  38199.671875|  38199.671875|        3|         10|
|{2022-03-08 02:35...|    1718272|38375.82421875|38375.82421875|38375.82421875|38375.82421875|        3|         10|
|{2022-03-08 02:20...|   39682048|38360.87890625|38360.87890625|38360.87890625|38360.87890625|        3|         10|
|{2022-03-08 03:50...|    5595136|38602.40234375|38602.40234375|

                                                                                

In [38]:
five_min_agg.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- sum(Volume): long (nullable = true)
 |-- last(High): double (nullable = true)
 |-- last(Close): double (nullable = true)
 |-- last(Open): double (nullable = true)
 |-- last(Low): double (nullable = true)
 |-- MonthYear: integer (nullable = true)
 |-- Week_Number: integer (nullable = true)



In [39]:
BUCKET="proud-sweep-342309"

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = BUCKET
spark.conf.set('temporaryGcsBucket', bucket)

In [40]:
colnames = ["Window","Volume","High","Close","Open","Low","Month","WeekNum"]
new_df = monthweekdf.toDF(*colnames)
# new_df.show()

In [42]:
# Saving the data to BigQuery
new_df.write.format('bigquery') \
  .option('table', 'proud-sweep-342309:yahoofinance.bitcoin_5_min') \
  .mode("append") \
  .save()

                                                                                