d
##![Spark Logo Tiny](https://files.training.databricks.com/images/wiki-book/general/logo_spark_tiny.png) Performing SAS DATA Step operations in Databricks


**In this lesson you:**
- Learn how to perform some of the most common DATA Step operations in Databricks
- Demonstrate examples of performing DATA steps using PySpark and Spark SQL
- Become familiar with looking up functions in Databricks and Apache Spark documentation

NOTE:
Many of the most commonly used operations are covered in more depth in the Databricks Academy course "Introduction to Apache Spark Programming with Databricks".

Other recommended courses include "Introduction to SQL on Databricks", "Applications of SQL on Databricks", and "Just Enough Python for Apache Spark".

SAS DATA steps are generally used to:
  - read input files
  - work with variables and numbers
  - merge datasets (unions and joins)
  
In the previous lesson, we used `spark.read` to read in files, without altering the variables. Now, we will demonstrate how to perform some of other common operations in Databricks.

NOTE: Any of the SAS Data Steps you are used to performing can be translated directly into Spark SQL.

Run the following cell to get started:

In [0]:
%run ./Includes/classroom-setup-2


username: rashbeats@gmail.com
working_dir:   dbfs:/user/rashbeats_gmail_com/dbacademy/sasproc
database_name: dbacademy_rashbeats_gmail_com_sasproc
Out[1]: True

Out[2]: True

## Setting up the data

In this lesson, we will be working with some global sales data. The data files have already been copied from a source directory in cloud storage to your dbfs location, `filepath`.

In [0]:
dbutils.fs.ls(filepath)

Out[3]: [FileInfo(path='dbfs:/dbfs/user/rashbeats_gmail_com/dbacademy/sasproc/rawdata/cities/', name='cities/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/dbfs/user/rashbeats_gmail_com/dbacademy/sasproc/rawdata/retailers/', name='retailers/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/dbfs/user/rashbeats_gmail_com/dbacademy/sasproc/rawdata/transactions/', name='transactions/', size=0, modificationTime=0)]

In [0]:
# Python trick! You can also use a Python list comprehension with dbutils to list all the items in the filepath
[print(item) for item in dbutils.fs.ls(filepath)]

FileInfo(path='dbfs:/dbfs/user/rashbeats_gmail_com/dbacademy/sasproc/rawdata/cities/', name='cities/', size=0, modificationTime=0)
FileInfo(path='dbfs:/dbfs/user/rashbeats_gmail_com/dbacademy/sasproc/rawdata/retailers/', name='retailers/', size=0, modificationTime=0)
FileInfo(path='dbfs:/dbfs/user/rashbeats_gmail_com/dbacademy/sasproc/rawdata/transactions/', name='transactions/', size=0, modificationTime=0)
Out[4]: [None, None, None]

In [0]:
# read each of the raw data files into a DataFrame
cities_df = spark.read.format("delta").load(filepath + '/cities')
retailers_df = spark.read.format("delta").load(filepath + '/retailers')
transactions_df = spark.read.format("delta").load(filepath + '/transactions')

You can view the schema of a DataFrame to see the data types of each variable:

In [0]:
cities_df.printSchema()

root
 |-- city_id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_abv: string (nullable = true)
 |-- country: string (nullable = true)



As you saw in the previous notebook, creating temp tables from the DataFrames will allow you to query them using Spark SQL. 

NOTE: Recall that these tables will not persist after your current Spark session, meaning that they will not be available if you restart the cluster or switch to a new notebook.

In [0]:
retailers_df.createOrReplaceTempView("retailers")
transactions_df.createOrReplaceTempView("transactions")
cities_df.createOrReplaceTempView("cities")

## Performing DATA Steps

There are many references in the official documentation for how to perform these operations using PySpark or Spark SQL: 

[PySpark DataFrame functions](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark-sql-dataframe)   
[SQL built-in functions](https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-builtin.html)

We will demonstrate some of the operations you may be used to performing, and how you do these in Databricks, in the rest of this notebook.

### 1. Merge

In SAS, a Merge operation might look like:

```
data retailers_df;
 merge transactions_df;
 by retailer_id;
run;
```

In this example, we will merge `retailers_df` and `transactions_df` on the `retailer_id` field, first using PySpark, then using Spark SQL, using the [`join` function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html#pyspark.sql.DataFrame.join).

NOTE: With Spark, you do NOT have to sort datasets before performing operations on them!

In [0]:
# with PySpark
sales_df = (retailers_df.join(transactions_df, "retailer_id"))
display(sales_df)

retailer_id,retailer,us_sales,other_sales,all_sales,us_vs_world,city_id,amount,trx_id,description,transacted_at
847200066,Wal-Mart,343624000,164841000,508465000,0.6758065943575271,28424447,46.11,1249734743,Wal-Mart ccd id: 428530,2011-11-25 19:00:00
1076023740,Aldi,11728000,41794000,53522000,0.2191248458577781,1518631622,251.86,1249734744,unkn ppd id: 680848 Bern 11-25,2011-11-25 19:00:00
847200066,Wal-Mart,343624000,164841000,508465000,0.6758065943575271,1601875893,31.89,1249734745,Wal-Mart ppd id: 813297,2011-11-25 19:00:00
902350112,DineEquity,7137000,452000,7589000,0.94044011068652,1217211842,92.96,1249734746,DineEquity ppd id: 379557,2011-11-25 19:00:00
1445595477,Meijer,15689000,0,15689000,1.0,1898204901,309.37,1249734747,Meijer arc id: 773946 Dar es Salaam 11-26,2011-11-25 19:00:00
1953761884,Home Depot,74203000,8992000,83195000,0.891916581525332,1137214313,38.66,1249734748,unkn arc id: 624310 Buenos Aires,2011-11-25 19:00:00
1273066548,7-Eleven,11390000,76635000,88025000,0.129395058222096,2055198208,1543.66,1249734749,7-Eleven ccd id: 226597 Bismarck,2011-11-25 19:00:00
1318092070,Bi-Lo,10362000,0,10362000,1.0,275328977,383.93,1249734750,Bi-Lo ppd id: 1004833 11-27,2011-11-25 19:00:00
847200066,Wal-Mart,343624000,164841000,508465000,0.6758065943575271,1359730291,28.12,1249734751,Wal-Mart ccd id: 1029496,2011-11-25 19:00:00
9225731,AT&T Wireless,12960000,0,12960000,1.0,407629665,1145.21,1249734752,unkn Kingston 11-27,2011-11-25 19:00:00


### 2. Working with dates

`DATE`   
To return the current date like you would do with the SAS `today()` function, use the [function `current_date`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.current_date.html).

`TODAY`   
If you have a date in the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) format, you can convert it to a string representing the datetime in a specified timezone using the [`from_unixtime` function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.from_unixtime.html).

`YEAR`   
To extract the year from a date as an integer, like with the SAS `year()` function, use the [`year` function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.year.html).

Create a column "year" by extracting the year from the "transacted_at" field:

In [0]:
# in PySpark:
from pyspark.sql.functions import year
display(transactions_df.select(year('transacted_at').alias('year')))

year
2011
2011
2011
2011
2011
2011
2011
2011
2011
2011


Extract various date formats from a datetime column using `date_format` and create new columns using `withColumn`:

In [0]:
import pyspark.sql.functions as F

from pyspark.sql.functions import date_format

formattedDF = (transactions_df.withColumn("date string", date_format("transacted_at", "MMMM dd, yyyy"))
  .withColumn("time string", date_format("transacted_at", "HH:mm:ss.SSSSSS"))
)
display(formattedDF)

city_id,amount,retailer_id,trx_id,description,transacted_at,date string,time string
28424447,46.11,847200066,1249734743,Wal-Mart ccd id: 428530,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
1518631622,251.86,1076023740,1249734744,unkn ppd id: 680848 Bern 11-25,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
1601875893,31.89,847200066,1249734745,Wal-Mart ppd id: 813297,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
1217211842,92.96,902350112,1249734746,DineEquity ppd id: 379557,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
1898204901,309.37,1445595477,1249734747,Meijer arc id: 773946 Dar es Salaam 11-26,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
1137214313,38.66,1953761884,1249734748,unkn arc id: 624310 Buenos Aires,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
2055198208,1543.66,1273066548,1249734749,7-Eleven ccd id: 226597 Bismarck,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
275328977,383.93,1318092070,1249734750,Bi-Lo ppd id: 1004833 11-27,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
1359730291,28.12,847200066,1249734751,Wal-Mart ccd id: 1029496,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000
407629665,1145.21,9225731,1249734752,unkn Kingston 11-27,2011-11-25 19:00:00,"November 25, 2011",19:00:00.000000


There are many, many other ways to work with dates in PySpark and Spark SQL. See the official [Databricks documentation](https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html) for more!

### 3. Working with strings

To extract a substring from a string, as in the SAS function `substr()`, use the [`substring` function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.substring.html).

You can use the [`trim` function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.trim.html) in PySpark to perform the same functions as `strip()` in SAS.

In [0]:
from pyspark.sql.functions import substring
display(transactions_df.select('description', substring(trim('description'), 1,8).alias('desc_short')))

description,desc_short
Wal-Mart ccd id: 428530,Wal-Mart
unkn ppd id: 680848 Bern 11-25,unkn
Wal-Mart ppd id: 813297,Wal-Mart
DineEquity ppd id: 379557,DineEqui
Meijer arc id: 773946 Dar es Salaam 11-26,Meijer
unkn arc id: 624310 Buenos Aires,unkn
7-Eleven ccd id: 226597 Bismarck,7-Eleven
Bi-Lo ppd id: 1004833 11-27,Bi-Lo
Wal-Mart ccd id: 1029496,Wal-Mart
unkn Kingston 11-27,unkn


Like the other DATA Steps, you can perform the same action as the SAS `lowcase()` function using `LOWER()` in SQL or [PySpark](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.lower.html):

In [0]:
# another way to do the same thing:
display(spark.sql("SELECT *, LOWER(description) AS desc_lower FROM transactions"))

city_id,amount,retailer_id,trx_id,description,transacted_at,desc_lower
28424447,46.11,847200066,1249734743,Wal-Mart ccd id: 428530,2011-11-25 19:00:00,wal-mart ccd id: 428530
1518631622,251.86,1076023740,1249734744,unkn ppd id: 680848 Bern 11-25,2011-11-25 19:00:00,unkn ppd id: 680848 bern 11-25
1601875893,31.89,847200066,1249734745,Wal-Mart ppd id: 813297,2011-11-25 19:00:00,wal-mart ppd id: 813297
1217211842,92.96,902350112,1249734746,DineEquity ppd id: 379557,2011-11-25 19:00:00,dineequity ppd id: 379557
1898204901,309.37,1445595477,1249734747,Meijer arc id: 773946 Dar es Salaam 11-26,2011-11-25 19:00:00,meijer arc id: 773946 dar es salaam 11-26
1137214313,38.66,1953761884,1249734748,unkn arc id: 624310 Buenos Aires,2011-11-25 19:00:00,unkn arc id: 624310 buenos aires
2055198208,1543.66,1273066548,1249734749,7-Eleven ccd id: 226597 Bismarck,2011-11-25 19:00:00,7-eleven ccd id: 226597 bismarck
275328977,383.93,1318092070,1249734750,Bi-Lo ppd id: 1004833 11-27,2011-11-25 19:00:00,bi-lo ppd id: 1004833 11-27
1359730291,28.12,847200066,1249734751,Wal-Mart ccd id: 1029496,2011-11-25 19:00:00,wal-mart ccd id: 1029496
407629665,1145.21,9225731,1249734752,unkn Kingston 11-27,2011-11-25 19:00:00,unkn kingston 11-27


## 4. Working with columns

In SAS, you use `IF/THEN/ELSE` and `WHERE` to create columns based on business logic.

In this example, we will add a new column "volume" to `retailers_df` with the value "large" if "all_sales" is greater than 30000000, otherwise "small", using the PySpark functions `withColumn`[(docs link)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html), `when` [(docs link)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.when.html), `otherwise` [(docs link)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.otherwise.html), and `col` [(docs link)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.col.html).

In [0]:
retailers_df = retailers_df.withColumn("volume", when(col("all_sales") > 30000000, "large").otherwise("small"))

display(retailers_df)

retailer_id,retailer,us_sales,other_sales,all_sales,us_vs_world,volume
2001148981,Costco,79694000,31836000,111530000,0.7145521384380884,large
1953761884,Home Depot,74203000,8992000,83195000,0.891916581525332,large
1076023740,Aldi,11728000,41794000,53522000,0.2191248458577781,large
683159064,Safeway,36330000,0,36330000,1.0,large
1654681099,Delhaize America,17069000,11348000,28417000,0.6006615758172925,small
1126742091,H-E-B Grocery,19819000,1347000,21166000,0.93636020032127,small
1245928212,Gap,13071000,3885000,16956000,0.7708775654635527,small
44686722,Ace Hardware,14299000,46000,14345000,0.9967933077727432,small
202319369,J.C. Penney,12184000,73000,12257000,0.9940442196295994,small
1647858807,Verizon Wireless,10959000,0,10959000,1.0,small


## 5. Working with data types
`INPUT` and `PUT` convert a character value into a numeric value or vice versa, in SAS. The PySpark [`cast` function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.cast.html) achieves the same.

You can read more about data types in Spark in the [docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types).

In [0]:
# display the current data types
transactions_df.printSchema()

root
 |-- city_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- retailer_id: string (nullable = true)
 |-- trx_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- transacted_at: string (nullable = true)



In [0]:
# cast the `city_id` column to integer type
transactions_df.withColumn('city_id', transactions_df.city_id.cast('int')).printSchema()

root
 |-- city_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- retailer_id: string (nullable = true)
 |-- trx_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- transacted_at: string (nullable = true)



In [0]:
# cast the `city_id` column back to string type
transactions_df.withColumn('city_id', transactions_df.city_id.cast('string')).printSchema()

root
 |-- city_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- retailer_id: string (nullable = true)
 |-- trx_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- transacted_at: string (nullable = true)



## 6. Working with numbers

All of the mathematical functions you use in SAS, such as:
- `CEIL`
- `INT`
- `ROUND`
- `MEAN`, `MIN`, `MAX`

can be performed using PySpark or SQL functions. Check out [the documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions) to see more!

Many mathematical operations are performed on aggregated objects. In Databricks, use the DataFrame `groupBy` method to create a grouped data object.

Next, group `sales_df` by "city_id" and count the number of sales for each city:

In [0]:
sales_by_city_df = sales_df.groupBy("city_id").count()
display(sales_by_city_df)

city_id,count
1126623009,97589
1744912105,97436
1607451095,287093
2008587661,97587
1932789846,97628
1985376021,287163
353140025,97297
1518631622,97580
2008300608,97256
1545959797,97570


Next, get the average amount per sale for each retailer.

NOTE: We need to cast the data type of the "amount" column from a string to a number before we can perform aggregations on it.

In [0]:
sales_df = sales_df.withColumn("amount", sales_df.amount.cast('int'))

sales_by_retailer_df = sales_df.groupBy("retailer").avg("amount")
display(sales_by_retailer_df)

retailer,avg(amount)
Alimentation Couche-Tard,368.476730477741
GameStop,372.7061335231064
Michael's,373.65415659562
SUPERVALU,370.7857261734644
Nordstrom,370.7883616625341
QVC,372.9475708265569
Walgreen,369.6818027308248
Verizon Wireless,367.4953726077711
Dick's Sporting Goods,373.4431811584386
Gap,369.9919673947452


You can use the grouped data method `agg` to apply built-in aggregate functions.

This allows you to apply other transformations on the resulting columns, such as `alias`.

We can also use the `round` function to round the average amount.

In [0]:
sales_by_city_df = sales_df.groupBy("city_id").agg(round(avg("amount")).alias("avg_sale"))

display(sales_by_city_df)

city_id,avg_sale
1126623009,370.0
1744912105,370.0
1607451095,370.0
2008587661,367.0
1932789846,366.0
1985376021,371.0
353140025,370.0
1518631622,373.0
2008300608,370.0
1545959797,365.0


In the next notebook, we will explore how to perform some common SAS PROC Steps in Databricks.