### Dask and Pandas
Pandas is a very popular and powerful framework for analyzing structured data, but its biggest limitation is that it was not designed with scalability in mind. Pandas is exceptionally well suited for handling small structured datasets and is highly optimized to perform fast and efficient operations on data stored in memory.

This is where Dask’s DataFrame API comes in: by providing a wrapper around Pandas that intelligently splits huge data frames into smaller pieces and spreads them across a cluster of workers, operations on huge datasets can be completed much more quickly and robustly The different pieces of the DataFrame that Dask oversees are called partitions. Each partition is a relatively small DataFrame that can be dispatched to any worker and maintains its full lineage Using Pandas, the dataset would be loaded into memory and worked on sequentially one row at a time. Dask, on the other hand, can split the data into multiple partitions, allowing the workload to be parallelized

![image](https://drive.google.com/uc?id=1kde4l6iPTUAc9oSGvzzhXkl8lIyz94IZ)

### Managing DataFrame partitioning
Since partitioning can have such a significant impact on performance. For example,when reading in data using the read_csv method
of Dask DataFrames, the default partition size is 64 MB each (this is also known as the default blocksize).
If you desire to create a DataFrame with a specific number of partitions instead, you can specify that when creating the DataFrame by passing in the npartitions argument.
##### Converts the Pandas DataFrame to a Dask DataFrame with defining partitions.</B>
peopleDaskDataFrame = daskDataFrame.from_pandas(peoplePandasDataFrame,<B>npartitions=2</B>) 

##### map_partitions generally applies a given function to each partition.
print(people_filtered.<B>map_partitions(len)</B>.compute())

### Using Dask Datatypes
Dask uses random sampling to infer datatypes to avoid scanning the entire (potentially massive) DataFrame.
When we are dealing with big data this approach may breakdown

1 : IT may identified integer columns has object due large  number NaN value.Dask will throw an exception once it begins to work on a computation.

C:\Users\vishal\Anaconda3\lib\site-packages\dask\local.py:270: DtypeWarning: Columns (18,38) have mixed types. Specify dtype option on import or set low_memory=False.

2: If you’re working on a computer with a 64-bit CPU and running a 64-bit OS, Python will always allocate 64 bits of memory to store an integer. The advantage of using smaller datatypes where appropriate is that you can hold more data in RAM and the CPU’s cache at one time, leading to faster, more efficient computations. This means that when creating a schema for your data, you should always choose the smallest possible datatype to hold your data.

print (dsf.columns)
print (dsf._meta.dtypes)

![image](https://drive.google.com/uc?id=1P5UxWl8HVBruPs5HeMmkZ_xTKOu_-0UR)

### Limitations of Dask DataFrames
1 : Dask DataFrames do not expose the entire Pandas API. Eventhough Dask DataFrames are made up of smaller Pandas DataFrames.
Forexample, functions that would alter the structure of the DataFrame, such as insert and pop, are not supported because Dask DataFrames are <B>immutable.</B>

2 : limitation is with relational-type operations, such as join/merge,groupby, and rolling. Although these operations are supported, they are likely to involve a lot of shuffling, making them performance bottlenecks. This can be minimized, again, either by using Dask to prepare a smaller dataset that can be dumped into Pandas, or by limiting these operations to only use the index.

### Summary
Dask DataFrames consist of rows (axis 0), columns (axis 1), and an index.
DataFrame methods tend to operate row-wise by default.

Inspecting how a DataFrame is partitioned can be done by accessing the divisions attribute of a DataFrame.

Filtering a DataFrame can cause an imbalance in the size of each partition. For best performance, partitions should be roughly equal in size. It’s a good practice to repartition a DataFrame using the repartition method after filtering a large amount of data.

For best performance, DataFrames should be indexed by a logical column, partitioned by their index, and the index should be presorted.

### Reading data from difference Source 


<B>Relational Databases</B>

Reading data from a relational database system (RDBMS) into Dask is fairly easy.
In fact, you’re likely to find that the most tedious part of interfacing with RDBMSs is setting up and configuring your Dask
environment to do so.
Dask uses the SQL Alchemy library to interface with RDBMS. pyodbc library to manage your ODBC drivers.
This means you will need to install and configure SQL Alchemy, pyodbc, and the ODBC drivers for your specific RDBMS on each machine in your cluster for Dask to work correctly.

username = 'jesse'

password = 'DataScienceRulez'

hostname = 'localhost'

database_name = 'DSAS'

odbc_driver = 'ODBC+Driver+13+for+SQL+Server'

connection_string = 'mssql+pyodbc://{0}:{1}@{2}/{3}?driver={4}'.

format(username, password, hostname, database_name, odbc_driver)

data = dsf.read_sql_table('violations', connection_string, index_col='SummonsNumber', npartitions=200)

<B> Hadoop and Amazon S3 </B>

HDFS and S3 are two of the most popular distributed filesystems, but they have one key difference for our purposes:

HDFS is designed to allow computations to run on the same nodes that serve up data, and S3 is not.

Amazon designed S3 as a web service dedicated solely to file storage and retrieval. There’s absolutely no way to execute application code on S3 servers.
This means that when you work with datastored in S3, you will always have to transmit partitions from S3 to a Dask worker node
in order to process it. Let’s now take a look at how we can use Dask to read data from these systems.

While reading mutiple files some files may not have same columns, so take out common_columns to read all CSV in one go.


data = dsf.read_csv('s3://my-bucket/nyc-parking-tickets/*.csv', dtype=dtypes,usecols=common_columns) 

Our read_csv call is (again) almost exactly the same  however, we’ve prefixed the file path with s3:// to tell Dask that the data islocated on an S3 filesystem, and my-bucket lets Dask know to look for the files in the S3 bucket associated with your AWS account named “my-bucket”. In order to use the S3 functionality, you must have the s3fs library installed on each Dask worker. The final requirement is that each Dask worker is properly configured for authenticating with S3. s3fs uses the boto library to communicate with S3.


data = dsf.read_csv('hdfs://localhost/nyc-parking-tickets/*.csv',dtype=dtypes, usecols=common_columns)

We have a read_csv call that should look very familiar by now. In fact, the only thing that’s changed is the file path.Prefixing the file path with hdfs:// tells Dask to look for the files on an HDFS cluster instead of the local filesystem, and localhost indicates that Dask should query the local HDFS NameNode for information on the whereabouts of the file. In this way, Dask makes it extremely easy to work with HDFS. The only additional requirement is that you install the hdfs3 library on each of your Dask workers


<table>
  <tr>
    <th colspan="6">Reading data from different source </th>
  </tr>
  <tr>
    <td>No</td>
    <td>File System</td>
    <td>Dask Worker Node installation</td>
    <td colspan="3" rowspan="4"></td>
  </tr>
  <tr>
    <td>1</td>
    <td>Amazon S3</td>
    <td>pip install s3fs<br>pip install boto<br></td>
  </tr>
  <tr>
    <td>2</td>
    <td>HDFS</td>
    <td>pip install hdfs3</td>
  </tr>
  <tr>
    <td>3</td>
    <td>RDBMS</td>
    <td>pip install SQLAlchemy<br>pip install pyodbc<br></td>
  </tr>
</table>

In [185]:
import numpy as np
dtypes = {
'Date First Observed': np.str,
'Days Parking In Effect ': np.str,
'Double Parking Violation': np.str,
'Feet From Curb': np.float32,
'From Hours In Effect': np.str,
'House Number': np.str,
'Hydrant Violation': np.str,
'Intersecting Street': np.str,
'Issue Date': np.str,
'Issuer Code': np.float32,
'Issuer Command': np.str,
'Issuer Precinct': np.float32,
'Issuer Squad': np.str,
'Issuing Agency': np.str,
'Law Section': np.float32,
'Meter Number': np.str,
'No Standing or Stopping Violation': np.str,
'Plate ID': np.str,
'Plate Type': np.str,
'Registration State': np.str,
'Street Code1': np.uint32,
'Street Code2': np.uint32,
'Street Code3': np.uint32,
'Street Name': np.str,
'Sub Division': np.str,
'Summons Number': np.uint32,
'Time First Observed': np.str,
'To Hours In Effect': np.str,
'Unregistered Vehicle?': np.str,
'Vehicle Body Type': np.str,
'Vehicle Color': np.str,
'Vehicle Expiration Date': np.str,
'Vehicle Make': np.str,
'Vehicle Year': np.float32,
'Violation Code': np.uint16,
'Violation County': np.str,
'Violation Description': np.str,
'Violation In Front Of Or Opposite': np.str,
'Violation Legal Code': np.str,
'Violation Location': np.str,
'Violation Post Code': np.str,
'Violation Precinct': np.float32,
'Violation Time': np.str
}

In [186]:
# Reading data set with appx 2GB in szie and 12 lakhs records 
import dask.dataframe as dd
import os 
from dask.diagnostics import ProgressBar 
df = dd.read_csv(r'C:\Users\vishal\Downloads\nyc-parking-tickets\*2017.csv',dtype=dtypes)
print("Number ofpartitions=",df.npartitions)

Number ofpartitions= 33


In [187]:
# print the count for each partitions 
with ProgressBar():
    print(df.map_partitions(len).compute(num_workers=8))

[########################################] | 100% Completed |  1min  8.9s
0     330844
1     330801
2     330892
3     330817
4     330888
5     330813
6     330866
7     330809
8     330815
9     330815
10    330883
11    330830
12    330877
13    330785
14    330804
15    330818
16    330776
17    330774
18    330855
19    330867
20    330820
21    330836
22    330825
23    330828
24    330809
25    330859
26    330835
27    330817
28    330839
29    330921
30    330844
31    336771
32    210395
dtype: int64


In [188]:
#Dropping columns with more then 90 % missing values 
with ProgressBar():
    missing_values = df.isnull().sum()
    percent_missing = ((missing_values / df.index.size) * 100).compute(num_workers=4)
    columns_to_drop = list(percent_missing[percent_missing >= 90].index)
    nyc_data_clean_stage1 = df.drop(columns_to_drop, axis=1)
    print(columns_to_drop)

[########################################] | 100% Completed |  1min 20.1s
['Time First Observed', 'No Standing or Stopping Violation', 'Hydrant Violation', 'Double Parking Violation']


In [189]:
# Before imputing vehicle color missing count 
with ProgressBar():
    missing_values = nyc_data_clean_stage1['Vehicle Color'].isnull().sum().compute(num_workers=4)
    print ("Before imputing vehicle color missing count=",missing_values)

[########################################] | 100% Completed |  1min 12.3s
Before imputing vehicle color missing count= 152342


In [190]:
# Imputing missing values for vehicle color categorical vairable
with ProgressBar():
    count_of_vehicle_colors = nyc_data_clean_stage1['Vehicle Color'].value_counts().compute()
    most_common_color = count_of_vehicle_colors.sort_values(ascending=False).index[0]
    nyc_data_clean_stage2 = nyc_data_clean_stage1.fillna({'Vehicle Color': most_common_color})

[########################################] | 100% Completed |  1min 17.8s


In [191]:
# After imputing vehicle color missing count value 
with ProgressBar():
    missing_values = nyc_data_clean_stage2['Vehicle Color'].isnull().sum().compute(num_workers=4)
    print ("After imputing vehicle color missing count=",missing_values)


[########################################] | 100% Completed |  1min 17.2s
After imputing vehicle color missing count= 0


In [192]:
# after imputation vairfy Vehicle Color with othercolumms missing values
with ProgressBar():
    missing_values = nyc_data_clean_stage2.isnull().sum().compute(num_workers=4)
    print (missing_values)

[########################################] | 100% Completed |  1min 23.0s
Summons Number                             0
Plate ID                                 728
Registration State                         0
Plate Type                                 0
Issue Date                                 0
Violation Code                             0
Vehicle Body Type                      42711
Vehicle Make                           73050
Issuing Agency                             0
Street Code1                               0
Street Code2                               0
Street Code3                               0
Vehicle Expiration Date                    0
Violation Location                   2072400
Violation Precinct                         0
Issuer Precinct                            0
Issuer Code                                0
Issuer Command                       2062645
Issuer Squad                         2063541
Violation Time                            63
Violation County          