In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

sc=SparkContext.getOrCreate()


### RDD

In [2]:
rdd1 = sc.textFile("D:\hoangth\spark\world_count.txt")

In [3]:
rdd1.collect()

['big data is very interesting ', 'big data is one of the most trending tech']

In [4]:
rdd2 = rdd1.flatMap(lambda x: x.split(" "))

In [5]:
rdd2.collect()

['big',
 'data',
 'is',
 'very',
 'interesting',
 '',
 'big',
 'data',
 'is',
 'one',
 'of',
 'the',
 'most',
 'trending',
 'tech']

### inferschema

In [6]:
df = spark.read\
    .format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("a.csv")

### spark read all data and infer schema

In [8]:
df.printSchema()

root
 |-- Patient Id: integer (nullable = true)
 |-- Permanent Facility Id: double (nullable = true)
 |-- Age Group: string (nullable = true)
 |-- Zip Code - 3 digits: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Length of Stay: string (nullable = true)
 |-- Type of Admission: string (nullable = true)
 |-- Patient Disposition: string (nullable = true)
 |-- CCS Diagnosis Code: string (nullable = true)
 |-- Date: date (nullable = true)



In [9]:
df1 = spark.read\
    .format("csv")\
    .option("header", "true")\
    .option("inferSchema", "false")\
    .load("a.csv")
    
### cast all data type col to string

In [10]:
df1.printSchema()

root
 |-- Patient Id: string (nullable = true)
 |-- Permanent Facility Id: string (nullable = true)
 |-- Age Group: string (nullable = true)
 |-- Zip Code - 3 digits: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Length of Stay: string (nullable = true)
 |-- Type of Admission: string (nullable = true)
 |-- Patient Disposition: string (nullable = true)
 |-- CCS Diagnosis Code: string (nullable = true)
 |-- Date: string (nullable = true)



In [11]:
df2 = spark.read\
    .format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .option("samplingRatio", "0.1")\
    .load("a.csv")
    
### spark read 10% of data

In [12]:
df2.printSchema()

root
 |-- Patient Id: integer (nullable = true)
 |-- Permanent Facility Id: double (nullable = true)
 |-- Age Group: string (nullable = true)
 |-- Zip Code - 3 digits: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Length of Stay: string (nullable = true)
 |-- Type of Admission: string (nullable = true)
 |-- Patient Disposition: string (nullable = true)
 |-- CCS Diagnosis Code: string (nullable = true)
 |-- Date: date (nullable = true)



### define schema

In [13]:
a_schema = """
            `Patient Id` INTEGER,
            `Permanent Facility Id` DOUBLE,
            `Age Group` STRING,
            `Zip Code - 3 digits` STRING,
            `Gender` STRING,
            `Race` STRING,
            `Ethnicity` STRING,
            `Length of Stay` STRING,
            `Type of Admission` STRING,
            `Patient Disposition` STRING,
            `CCS Diagnosis Code` STRING,
            `Date` DATE
            """

In [14]:
df3 = spark.read\
    .format("csv")\
    .option("header", "true")\
    .schema(a_schema)\
    .load("a.csv")

In [15]:
df.printSchema()

root
 |-- Patient Id: integer (nullable = true)
 |-- Permanent Facility Id: double (nullable = true)
 |-- Age Group: string (nullable = true)
 |-- Zip Code - 3 digits: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Length of Stay: string (nullable = true)
 |-- Type of Admission: string (nullable = true)
 |-- Patient Disposition: string (nullable = true)
 |-- CCS Diagnosis Code: string (nullable = true)
 |-- Date: date (nullable = true)



In [16]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType

In [17]:
a_schema_struct = StructType([
    StructField("Patient Id", IntegerType(), True),
    StructField("Permanent Facility Id", DoubleType(), True),
    StructField("Age Group", StringType(), True),
    StructField("Zip Code - 3 digits", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Race", StringType(), True),
    StructField("Ethnicity", StringType(), True),
    StructField("Length of Stay", StringType(), True),
    StructField("Type of Admission", StringType(), True),
    StructField("Patient Disposition", StringType(), True),
    StructField("CCS Diagnosis Code", StringType(), True),
    StructField("Date", DateType(), True)
])

In [18]:
df3 = spark.read\
    .format("csv")\
    .option("header", "true")\
    .schema(a_schema_struct)\
    .load("a.csv")

In [20]:
df3.show()

+----------+---------------------+-----------+-------------------+------+--------------------+-----------------+--------------+-----------------+--------------------+------------------+----------+
|Patient Id|Permanent Facility Id|  Age Group|Zip Code - 3 digits|Gender|                Race|        Ethnicity|Length of Stay|Type of Admission| Patient Disposition|CCS Diagnosis Code|      Date|
+----------+---------------------+-----------+-------------------+------+--------------------+-----------------+--------------+-----------------+--------------------+------------------+----------+
|         1|               1061.0|   30 to 49|               NULL|     M|               White|Not Span/Hispanic|            21|         Elective|   Home or Self Care|               659|2017-12-20|
|         2|               1061.0|   50 to 69|                105|     M|               White| Spanish/Hispanic|             8|        Emergency|Skilled Nursing Home|                99|2017-07-31|
|         3|   