In [None]:
from pyspark import pipelines as dp
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
@dp.table(name='stock_prices_silver')
@dp.expect_all_or_drop({
    'trade_date_not_null': 'trade_date IS NOT NULL',
    'open_price_not_null': 'open_price IS NOT NULL',
    'high_price_not_null': 'high_price IS NOT NULL',
    'low_price_not_null': 'low_price IS NOT NULL',
    'close_price_not_null': 'close_price IS NOT NULL',
    'volume_not_null': 'volume IS NOT NULL',
    'brand_name_not_null': 'brand_name IS NOT NULL',
    'ticker_not_null': 'ticker IS NOT NULL',
    'industry_tag_not_null': 'industry_tag IS NOT NULL',
    'country_not_null': 'country IS NOT NULL'
})
def stock_prices_silver():
    df = spark.read.table('stock_prices.source.stock_prices_bronze')
    df = (
        df.withColumnRenamed('Date', 'trade_date')
        .withColumnRenamed('Open', 'open_price')
        .withColumnRenamed('High', 'high_price')
        .withColumnRenamed('Low', 'low_price')
        .withColumnRenamed('Close', 'close_price')
        .withColumnRenamed('Volume', 'volume')
        .withColumnRenamed('Brand_Name', 'brand_name')
        .withColumnRenamed('Ticker', 'ticker')
        .withColumnRenamed('Industry_Tag', 'industry_tag')
        .withColumnRenamed('Country', 'country')
        .withColumn('trade_date', to_date(col('trade_date')))
        .withColumn('open_price', col('open_price').cast(DoubleType()))
        .withColumn('high_price', col('high_price').cast(DoubleType()))
        .withColumn('low_price', col('low_price').cast(DoubleType()))
        .withColumn('close_price', col('close_price').cast(DoubleType()))
        .withColumn('volume', col('volume').cast(DoubleType()))
        .select(
            'trade_date',
            'open_price',
            'high_price',
            'low_price',
            'close_price',
            'volume',
            'brand_name',
            'ticker',
            'industry_tag',
            'country'
        )
    )
    return df