### Note:- For CMA buffers (inStream/outStream), avoid printing large amounts. Kernel crashes on ZCU104. Pynq Z2 does not as its Jupyter Notebook behaves differently

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
pyspark.__version__

'2.4.5'

In [5]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config('spark.sql.execution.arrow.fallback.enabled', 'true')\
    .config("spark.sql.execution.arrow.enabled", "true")\
    .master("local[*]")\
    .getOrCreate()


In [6]:
import os

In [7]:
os.getcwd()

'/home/xilinx/jupyter_notebooks'

In [8]:
from pynq import Overlay, Clocks

print(f'CPU:   {Clocks.cpu_mhz:.6f}MHz')
print(f'FCLK0: {Clocks.fclk0_mhz:.6f}MHz')


CPU:   1199.988000MHz
FCLK0: 99.999000MHz


In [9]:
# Use bitstream for PL 
overlay = Overlay(os.getcwd() + "/smart_parking_stream/dpu_ip/dpu.bit")

In [10]:
overlay?

In [11]:
# DPU IP
dpu_ip = overlay.dpu_ip
dpu_ip?

In [12]:
# DMA IP
dma = overlay.axi_dma
dma?

In [13]:
hw_timer = overlay.axi_timer
hw_timer?

In [14]:
#  Shows the Registers we need to access - Can use access via names or direct memory  (via names is easier)
dpu_ip.register_map


RegisterMap {
  CTRL = Register(AP_START=0, AP_DONE=0, AP_IDLE=1, AP_READY=0, RESERVED_1=0, AUTO_RESTART=0, RESERVED_2=0),
  GIER = Register(Enable=0, RESERVED=0),
  IP_IER = Register(CHAN0_INT_EN=0, CHAN1_INT_EN=0, RESERVED=0),
  IP_ISR = Register(CHAN0_INT_ST=0, CHAN1_INT_ST=0, RESERVED=0),
  max_size = Register(max_size=0),
  max_size_ctrl = Register(max_size_ap_vld=0, RESERVED=0)
}

In [15]:
# Smart Parking Stream

# First signal to set high. Ensures AP_START Signal does not go low after one cycle
# In AXI_STREAM, only setting AP_START enables computations for 1 stream
dpu_ip.register_map.CTRL.AUTO_RESTART = 1
# Computations occur while high
dpu_ip.register_map.CTRL.AP_START = 1

In [16]:
# Check if signals were set and read only registers have values
dpu_ip.register_map

RegisterMap {
  CTRL = Register(AP_START=1, AP_DONE=1, AP_IDLE=0, AP_READY=0, RESERVED_1=0, AUTO_RESTART=1, RESERVED_2=0),
  GIER = Register(Enable=0, RESERVED=0),
  IP_IER = Register(CHAN0_INT_EN=0, CHAN1_INT_EN=0, RESERVED=0),
  IP_ISR = Register(CHAN0_INT_ST=0, CHAN1_INT_ST=0, RESERVED=0),
  max_size = Register(max_size=4294967295),
  max_size_ctrl = Register(max_size_ap_vld=1, RESERVED=0)
}

In [17]:
# Maximum no of elements dpu accepts in input Stream
## This is for a finite size for loop implementation
## While loop implementation can accept unlimited size of stream
max_size = int(dpu_ip.register_map.max_size)
print(max_size)

4294967295


In [18]:
################# Test: Feed a single stream ##################

In [19]:
########## Timer Functions ################
def init_timer():
    # Generate Mode
    hw_timer.register_map.TCSR0.MDT0 = 0
    # DOWN counter
    hw_timer.register_map.TCSR0.UDT0 = 1
    # Don't Overwrite Load Value
    hw_timer.register_map.TCSR0.ARHT0 = 0
    
# Reset/Load Counter registers
## Note:- Run this block each time before measuring time
def reset_timer():
    ## MAX_COUNT
    hw_timer.register_map.TLR0 = 0xFFFFFFFF
    ## Load value from TLR0
    hw_timer.register_map.TCSR0.LOAD0 = 1
    ## Disable load bit so that timer can be enabled
    hw_timer.register_map.TCSR0.LOAD0 = 0
    
def start_timer():
    hw_timer.register_map.TCSR0.ENT0 = 1
    
def stop_timer():
    hw_timer.register_map.TCSR0.ENT0 = 0

In [21]:
init_timer()
reset_timer()
# Check signals were set
print(hw_timer.register_map)

RegisterMap {
  TCSR0 = Register(MDT0=0, UDT0=1, GENT0=0, CAPT0=0, ARHT0=0, LOAD0=0, ENIT0=0, ENT0=0, T0INT=0, PWMA0=0, ENALL=0, CASC=0),
  TLR0 = Register(TCLR0=4294967295),
  TCR0 = Register(TCR0=4294967295),
  TCSR1 = Register(MDT1=0, UDT1=0, GENT1=0, CAPT1=0, ARHT1=0, LOAD1=0, ENIT1=0, ENT1=0, T1INT=0, PWMA1=0, ENALL=0),
  TLR1 = Register(TCLR1=0),
  TCR1 = Register(TCR1=0)
}


In [54]:
########## HW Timer Measurement ###########

import time
import pandas as pd
import numpy as np
from pynq import allocate

streams = 10
row_length = 2

reset_timer()

start_timer()

for i in range(streams):
    

    
    df = spark.read.format("csv").option("header", "true").load('hdfs://afog-master:9000/part4-projects/resources/timing-exps/*.csv')
    df = df.drop(df.columns[2])
    
    # Transform to Numpy Array
    df_pd = df.toPandas()
    npArr = df_pd.to_numpy()
    
    # Convert 2D to 1D array
    npArr = npArr.flatten()
    # Make Input/Output Buffers

    # Allocate to Input Buffer.Prepare Output Buffer
    if max_size >= npArr.size: 

        inStream = allocate(shape=npArr.shape, dtype=np.uint32)

        inStream[:] = npArr[:]

        outStream = allocate(shape=(len(npArr)//row_length,), dtype=np.uint32)
    else:
        print("Use a lower batch size than {0} or implement alternate design that removes batch size limit".format(max_size))

stop_timer()

inStream.freebuffer()
outStream.freebuffer()
print("Completed transfer")    

  PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


Completed transfer


In [55]:
process_count = int(hw_timer.register_map.TCR0)
max_count = int(hw_timer.register_map.TLR0)
time = (max_count - process_count) * 1/(Clocks.fclk0_mhz * 10**6)
print("HW timer measurement for {0} batches of {1} records in s: {2}".format(streams, outStream.size,  time))
print("Time taken for 1 records in s: {0}".format((time)/(outStream.size * streams)))


HW timer measurement for 10 batches of 15360 records in s: 17.990112391123912
Time taken for 1 records in s: 0.00011712312754637964


In [40]:

print(hw_timer.register_map)

RegisterMap {
  TCSR0 = Register(MDT0=0, UDT0=1, GENT0=0, CAPT0=0, ARHT0=0, LOAD0=0, ENIT0=0, ENT0=0, T0INT=0, PWMA0=0, ENALL=0, CASC=0),
  TLR0 = Register(TCLR0=4294967295),
  TCR0 = Register(TCR0=3143225133),
  TCSR1 = Register(MDT1=0, UDT1=0, GENT1=0, CAPT1=0, ARHT1=0, LOAD1=0, ENIT1=0, ENT1=0, T1INT=0, PWMA1=0, ENALL=0),
  TLR1 = Register(TCLR1=0),
  TCR1 = Register(TCR1=0)
}
