Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The spark cache is too large #2819

Closed
mingnet opened this issue Oct 20, 2018 · 28 comments
Closed

The spark cache is too large #2819

mingnet opened this issue Oct 20, 2018 · 28 comments

Comments

@mingnet
Copy link

mingnet commented Oct 20, 2018

I want to know how much memory is needed when the geotrellis runs, so I set the cache policy of spark to DISK_ONLY. I use geotrellis2.0.0 and spark 2.3.0, I have 10G data and the spark has 3 working nodes. The data is stored in hdfs, I am trying to load geotiff data through etl.
According to the documentation (https://docs.geotrellis.io/en/latest/tutorials/etl-tutorial.html), I compile the geotrellis-spark-etl-assembly-2.0.0-SNAPSHOT.jar file.
Then I create the input.json file.

[{
    "format": "multiband-geotiff",
    "name": "china",
    "cache": "DISK_ONLY",
    "backend": {
        "type": "hadoop",
        "path": "hdfs://192.168.1.41:9000/tiletest/201705"
    }
}]

Output.json file

{
    "backend": {
        "type": "hadoop",
        "path": "hdfs://192.168.1.41:9000/result/test/"
    },
    "reprojectMethod": "buffered",
    "pyramid": true,
    "tileSize": 256,
    "keyIndexMethod": {
        "type": "zorder"
    },
    "resampleMethod": "nearest-neighbor",
    "layoutScheme": "zoomed",
    "crs": "EPSG:3857"
}

Backend-profiles.json file

{
    "backend-profiles": []
}

Then I execute the program through the following command

/opt/spark/bin/spark-submit --class geotrellis.spark.etl.MultibandIngest --master 'spark://192.168.1.8:7077' --driver-memory 50G /work/etl/geotrellis-spark-etl -assembly-2.0.0-SNAPSHOT.jar --input "file:///work/etl/input.json" --output "file:///work/etl/output.json" --backend-profiles " File:///work/etl/backend-profiles.json"

Then I found that the cache of the three working nodes of spark accounted for 25G of disk space.The cache per node is 2.5 times as much the original data.

Follow this. If I ingest 100G or 1TB of data, it will require a lot of cache space. I hope I can use less space so I can just use memory.

I want to know where there is something wrong, what do I need to do to improve it.

@pomadchin
Copy link
Member

pomadchin commented Oct 25, 2018

Hey @mingnet what GeoTrellis version do you use? Also can you provide gdalinfo of a typical tiff from the dataset you want to ingest?

@mingnet
Copy link
Author

mingnet commented Oct 30, 2018

Hey @pomadchin I need to correct the value of my question. Because I ignored the compression option for tiff data. The above value is the geotiff data size processed with gdal_retile.py, using the -co compress=deflate option.

I tried to turn off this option. The resources used in the process are as follows:
24G tiff data, each working node of spark accounted for 25G of disk space.
8G tiff data, each working node of spark accounted for 10G of disk space.

In other words, each node must be configured with at least the same amount of memory or disk space as the data. I don't think this is very reasonable. These data should be evenly distributed to 3 nodes. Even use fewer resources, because it should be possible to generate only one piece of data for the target grid at a time.

Suppose we need to process 1T data. It will be very difficult.

the GeoTrellis version is 2.0.0.

gdalinfo(before gdal_retile.py processing):

Driver: GTiff/GeoTIFF
Files: /data/remotesensing/201704/E116D8_N36D0_20170417_GF1_WFV2_DOM.tif
Size is 18315, 15387
Coordinate System is:
GEOGCS["WGS 84",
    DATUM["WGS_1984",
        SPHEROID["WGS 84",6378137,298.2572235604902,
            AUTHORITY["EPSG","7030"]],
        AUTHORITY["EPSG","6326"]],
    PRIMEM["Greenwich",0],
    UNIT["degree",0.0174532925199433],
    AUTHORITY["EPSG","4326"]]
Origin = (115.441238182399985,37.067668339199997)
Pixel Size = (0.000144000000000,-0.000144000000000)
Metadata:
  AREA_OR_POINT=Area
Image Structure Metadata:
  INTERLEAVE=PIXEL
Corner Coordinates:
Upper Left  ( 113.4414382,  36.0686683) (115d26'28.46"E, 37d 4' 3.61"N)
Lower Left  ( 115.4412382,  34.8519403) (115d26'28.46"E, 34d51' 6.99"N)
Upper Right ( 117.0788982,  37.0676683) (118d 4'42.95"E, 37d 4' 3.61"N)
Lower Right ( 119.0785982,  34.8519403) (118d 4'42.95"E, 34d51' 6.99"N)
Center      ( 116.9599182,  38.6598043) (116d45'35.71"E, 35d57'35.30"N)
Band 1 Block=18315x1 Type=UInt16, ColorInterp=Gray
Band 2 Block=18315x1 Type=UInt16, ColorInterp=Undefined
Band 3 Block=18315x1 Type=UInt16, ColorInterp=Undefined
Band 4 Block=18315x1 Type=UInt16, ColorInterp=Undefined
Driver: GTiff/GeoTIFF
Files: /data/remotesensing/201704/E114D2_N34D7_20170417_GF1_WFV1_DOM.tif
Size is 21192, 15671
Coordinate System is:
GEOGCS["WGS 84",
    DATUM["WGS_1984",
        SPHEROID["WGS 84",6378137,298.2572235604902,
            AUTHORITY["EPSG","7030"]],
        AUTHORITY["EPSG","6326"]],
    PRIMEM["Greenwich",0],
    UNIT["degree",0.0174532925199433],
    AUTHORITY["EPSG","4326"]]
Origin = (112.548738298100005,35.793564391800004)
Pixel Size = (0.000144000000000,-0.000144000000000)
Metadata:
  AREA_OR_POINT=Area
Image Structure Metadata:
  INTERLEAVE=PIXEL
Corner Coordinates:
Upper Left  ( 122.5487383,  35.1935644) (112d32'55.46"E, 35d47'36.83"N)
Lower Left  ( 132.5457383,  33.2369404) (112d32'55.46"E, 33d32'12.99"N)
Upper Right ( 115.6303863,  35.9935644) (115d36' 1.39"E, 35d47'36.83"N)
Lower Right ( 115.6603863,  33.4369404) (115d36' 1.39"E, 33d32'12.99"N)
Center      ( 114.0747623,  34.8652524) (114d 4'28.42"E, 34d39'54.91"N)
Band 1 Block=21192x1 Type=UInt16, ColorInterp=Gray
Band 2 Block=21192x1 Type=UInt16, ColorInterp=Undefined
Band 3 Block=21192x1 Type=UInt16, ColorInterp=Undefined
Band 4 Block=21192x1 Type=UInt16, ColorInterp=Undefined
Driver: GTiff/GeoTIFF
Files: /data/remotesensing/201704/E116D3_N34D3_20170417_GF1_WFV2_DOM.tif
Size is 17916, 15376
Coordinate System is:
GEOGCS["WGS 84",
    DATUM["WGS_1984",
        SPHEROID["WGS 84",6378137,298.2572235604902,
            AUTHORITY["EPSG","7030"]],
        AUTHORITY["EPSG","6326"]],
    PRIMEM["Greenwich",0],
    UNIT["degree",0.0174532925199433],
    AUTHORITY["EPSG","4326"]]
Origin = (115.032904865400013,35.398584405899996)
Pixel Size = (0.000144000000000,-0.000144000000000)
Metadata:
  AREA_OR_POINT=Area
Image Structure Metadata:
  INTERLEAVE=PIXEL
Corner Coordinates:
Upper Left  ( 112.0329049,  35.3485844) (115d 1'58.46"E, 35d23'54.90"N)
Lower Left  ( 114.0329049,  33.1644404) (115d 1'58.46"E, 33d11' 3.99"N)
Upper Right ( 127.6128089,  35.7985844) (117d36'46.11"E, 35d23'54.90"N)
Lower Right ( 127.6128089,  33.5844404) (117d36'46.11"E, 33d11' 3.99"N)
Center      ( 116.3248569,  34.2975124) (116d19'22.28"E, 34d17'29.44"N)
Band 1 Block=17916x1 Type=UInt16, ColorInterp=Gray
Band 2 Block=17916x1 Type=UInt16, ColorInterp=Undefined
Band 3 Block=17916x1 Type=UInt16, ColorInterp=Undefined
Band 4 Block=17916x1 Type=UInt16, ColorInterp=Undefined
Driver: GTiff/GeoTIFF
Files: /data/remotesensing/201704/E114D6_N36D3_20170417_GF1_WFV1_DOM.tif
Size is 21660, 15665
Coordinate System is:
GEOGCS["WGS 84",
    DATUM["WGS_1984",
        SPHEROID["WGS 84",6378137,298.2572235604902,
            AUTHORITY["EPSG","7030"]],
        AUTHORITY["EPSG","6326"]],
    PRIMEM["Greenwich",0],
    UNIT["degree",0.0174532925199433],
    AUTHORITY["EPSG","4326"]]
Origin = (112.903738283899969,37.463533658299994)
Pixel Size = (0.000144000000000,-0.000144000000000)
Metadata:
  AREA_OR_POINT=Area
Image Structure Metadata:
  INTERLEAVE=PIXEL
Corner Coordinates:
Upper Left  ( 122.9067383,  37.4335337) (112d54'13.46"E, 37d27'48.72"N)
Lower Left  ( 122.9047383,  35.2777737) (112d54'13.46"E, 35d12'27.99"N)
Upper Right ( 116.0427783,  37.6635337) (116d 1'22.00"E, 37d27'48.72"N)
Lower Right ( 116.0627783,  35.4077737) (116d 1'22.00"E, 35d12'27.99"N)
Center      ( 114.4633583,  36.6356537) (114d27'47.73"E, 36d20' 8.35"N)
Band 1 Block=21660x1 Type=UInt16, ColorInterp=Gray
Band 2 Block=21660x1 Type=UInt16, ColorInterp=Undefined
Band 3 Block=21660x1 Type=UInt16, ColorInterp=Undefined
Band 4 Block=21660x1 Type=UInt16, ColorInterp=Undefined

@pomadchin
Copy link
Member

pomadchin commented Oct 30, 2018

All right, that's what i thought. The reason you need such heavy nodes is because you have a tiff with 15665 segments, each of this segment is a strip sized 21660 (you have a striped tiff). It means to chunk this tiff by squares requires a lot of memory. I'll prepare a bit more expanded answer a lil bit later, but the main problem here is that it's very hard to read you tiff windowed. Also taking into account that you used a deflate compression to compress the source tiff, it means a rather heavy CPU load in this case during the ingest process just to read data into Spark mem.

Before changing any of the ingest settings i can suggest you to bump GT version up to 2.1 and to retile tiff to have square segments (256x256).

@mingnet
Copy link
Author

mingnet commented Oct 30, 2018

I will try your suggestion.

However, I have some problems. If set to 256, a large amount of data will generate a large number of files, so the indexing efficiency of the file system will be very low. I usually retile tiff to square segments (8192x8192).

What is your opinion about this?

@pomadchin
Copy link
Member

pomadchin commented Oct 30, 2018

@mingnet ah, just to be clear, i talk about segment layout, it will create no extra files - just a single tiff, though it's internal segments representation would be different. Try this:

# you can add any compression (optionally)
gdal_translate -co "TILED=YES" -co "COMPRESS=method" input.tiff output.tiff

and after that look into gdalinfo of the output tiff. You'll see that the blocksize of the tiff would be 256x256. You don't need to do any retiling

@mingnet
Copy link
Author

mingnet commented Oct 30, 2018

@pomadchin thanks. I will try to do this.

@pomadchin
Copy link
Member

@mingnet hm, so what do you mean by a large number of files?

@mingnet
Copy link
Author

mingnet commented Oct 30, 2018

@pomadchin very, very many small files. Maybe millions. I thought that each segment is divided into a tiff file.

@pomadchin
Copy link
Member

pomadchin commented Oct 30, 2018

@mingnet Gotcha! So each tiff consists of segments. That allows us to read them in a streaming fashion.

The way we read tiffs into memory is smth like solving a bin packing problem. We're trying to pack tiff segments into spatially co-located windows (by default sized 256x256), such windows would represent a partition limited by 128mb by default. In this case our partitioning scheme would be a set of windows distributed across the Spark cluster, where each window should be ~ 256x256 and the entire partition should be ~ 128mb. Such approach does not require to read all the tiff into memory just to read a small window.
During the read you receive N partitions, filled with (Extent, Tile) pairs, where Tile in the best case is just a tiff segment. It allows to create a cheapest and a well distributed RDD partitioning scheme.

If your tiff contains large segments, or large strip segments (like in your case), we'll try to pack large segments into partitions, and after that we'll have to shuffle a lot of data to crop out a square window on the next step, during creation of a tiled layout RDD (obviously, strips are not square and to receive a square window we need to read the entire set of strip segments to crop them in a memory of a node).

@mingnet
Copy link
Author

mingnet commented Oct 31, 2018

@pomadchin I have done what you said. But still the same as before.
I don't understand why this is happening. I will upload the data and files to aws's s3. Can you use my data to check this?
I am uploading data and will be able to complete it tomorrow. If you are willing to test this, I will send you the aws s3 address.

@pomadchin
Copy link
Member

@mingnet sounds good 👍 also a followup question, can you throw gdalinfo of the new generated file?

@pomadchin pomadchin added the bug label Oct 31, 2018
@pomadchin pomadchin changed the title the spark cache is too large The spark cache is too large Oct 31, 2018
@mingnet
Copy link
Author

mingnet commented Nov 1, 2018

@pomadchin I have uploaded data to aws s3. The address is s3://gtdatatest. This directory contains all files. You can download and test it. My spark is deployed in standalone mode.
gdalinfo(One of the tiff files)

Driver: GTiff/GeoTIFF
Files: /data1/data20181031/a_r_1_1.tif
Size is 8192, 8192
Coordinate System is:
GEOGCS["WGS 84",
DATUM["WGS_1984",
SPHEROID["WGS 84",6378137,298.257223563,
AUTHORITY["EPSG","7030"]],
AUTHORITY["EPSG","6326"]],
PRIMEM["Greenwich",0],
UNIT["degree",0.0174532925199433],
AUTHORITY["EPSG","4326"]]
Origin = (113.533894000000004,-26.867094999999999)
Pixel Size = (0.000144000000000,-0.000144000000000)
Metadata:
AREA_OR_POINT=Area
Image Structure Metadata:
INTERLEAVE=PIXEL
Corner Coordinates:
Upper Left ( 113.5338940, -26.8670950) (113d32' 2.02"E, 26d52' 1.54"S)
Lower Left ( 113.5338940, -28.0467430) (113d32' 2.02"E, 28d 2'48.27"S)
Upper Right ( 114.7135420, -26.8670950) (114d42'48.75"E, 26d52' 1.54"S)
Lower Right ( 114.7135420, -28.0467430) (114d42'48.75"E, 28d 2'48.27"S)
Center ( 114.1237180, -27.4569190) (114d 7'25.38"E, 27d27'24.91"S)
Band 1 Block=256x256 Type=UInt16, ColorInterp=Gray
Band 2 Block=256x256 Type=UInt16, ColorInterp=Undefined
Band 3 Block=256x256 Type=UInt16, ColorInterp=Undefined
Band 4 Block=256x256 Type=UInt16, ColorInterp=Undefined

@pomadchin
Copy link
Member

Btw @mingnet have you tried to disable caching? "cache": "NONE"

@mingnet
Copy link
Author

mingnet commented Nov 2, 2018

@pomadchin I just tested this and found that it was still the same as before. Do you use my data to test this problem?

@pomadchin
Copy link
Member

pomadchin commented Nov 2, 2018

Yea @mingnet thx for that; this sprint (it starts today) we scheduled to look into it! Thx!

@pomadchin
Copy link
Member

pomadchin commented Nov 2, 2018

Ah, can you give us access to your bucket? Mb public for a while / or contact me in gitter - we'll copy it into our internal buckets.

@mingnet
Copy link
Author

mingnet commented Nov 2, 2018

@pomadchin ok. Now you can access to my bucket. s3://gtdatatest
I have forget to change the permissions.

@pomadchin
Copy link
Member

Ok, i can list objects at the moment, but can't download them into a local disk / or to copy into our bucket. when calling the {UploadPartCopy | GetObject} operation: Access Denied

@mingnet
Copy link
Author

mingnet commented Nov 2, 2018

sorry. I am not proficient in s3. let me try again.
Now, try again. @pomadchin

@pomadchin
Copy link
Member

pomadchin commented Nov 2, 2018

@mingnet done! you can close the public access! thx, will let you know once we'll have any clues.

A short summary:

input.json:

[{
    "format": "multiband-geotiff",
    "name": "china",
    "cache": "DISK_ONLY",
    "backend": {
        "type": "hadoop",
        "path": "hdfs://192.168.1.41:9000/tiletest/201705"
    }
}]

output.json:

{
    "backend": {
        "type": "hadoop",
        "path": "hdfs://192.168.1.41:9000/result/test/"
    },
    "reprojectMethod": "buffered",
    "pyramid": true,
    "tileSize": 256,
    "keyIndexMethod": {
        "type": "zorder"
    },
    "resampleMethod": "nearest-neighbor",
    "layoutScheme": "zoomed",
    "crs": "EPSG:3857"
}

backend-profiles.json:

{
    "backend-profiles": []
}
24G tiff data, each working node of spark accounted for 25G of disk space.
8G tiff data, each working node of spark accounted for 10G of disk space.

What is the full spark-submit command you use and cluster parameters?
Including your current spark cluster params and number of cores / executors you allocate for your job?

Also do you have a screenshot from the spark-ui with all steps?

@mingnet
Copy link
Author

mingnet commented Nov 2, 2018

The spark deployed in standalone mode. It has 3 working nodes. This spark is spark-2.3.0-bin-hadoop2.7.

command:
spark-submit --class geotrellis.spark.etl.MultibandIngest --master 'spark://192.168.1.8:7077' --driver-memory 50G /work/etl/geotrellis-spark-etl-assembly-2.1.0.jar --input "file:///work/etl/input.json" --output "file:///work/etl/output.json" --backend-profiles "file:///work/etl/backend-profiles.json"

spark-defaults.conf

spark.executor.extraJavaOptions -Xss32m
spark.driver.extraJavaOptions=-Xss32m

PYSPARK_PYTHON=/opt/python3.7

spark-env.sh

export JAVA_HOME=/usr/lib/jvm/jre

export SCALA_HOME=/opt/scala

export HADOOP_HOME=/opt/hadoop

export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop

export SPARK_MASTER_HOST=master

export SPARK_MASTER_PORT=7077

export SPARK_LOCAL_DIRS=/data1

export SPARK_WORKER_MEMORY=50g

export SPARK_EXECUTOR_CORES=3
export SPARK_CORES_MAX=6

export SPARK_DRIVER_MEMORY=50g
export SPARK_EXECUTOR_MEMORY=50g
export PYSPARK_PYTHON=/opt/python3.7/bin/python3

@pomadchin
Copy link
Member

pomadchin commented Nov 2, 2018

Ok, wondering do you see that data is distributed across the cluster in the UI? (just a double check to confirm the behaviour); as it looks very much like it tries to process everything on a driver.

@mingnet
Copy link
Author

mingnet commented Nov 2, 2018

I don't quite understand which one you are talking about. Is it the following data?

ExecutorID | Worker | Cores | Memory | State | Logs
11 | worker-20181019165435-192.168.1.165-42882 | 3 | 51200 | EXITED | stdout stderr
1 | worker-20181019165435-192.168.1.165-42882 | 3 | 51200 | EXITED | stdout stderr
3 | worker-20181102141149-192.168.1.165-45754 | 3 | 51200 | EXITED | stdout stderr
17 | worker-20181102141149-192.168.1.8-36191 | 3 | 51200 | EXITED | stdout stderr
2 | worker-20181019165432-192.168.1.8-46594 | 3 | 51200 | EXITED | stdout stderr
13 | worker-20181102141149-192.168.1.8-36191 | 3 | 51200 | EXITED | stdout stderr
6 | worker-20181102141149-192.168.1.8-36191 | 3 | 51200 | EXITED | stdout stderr
16 | worker-20181102141149-192.168.1.8-36191 | 3 | 51200 | EXITED | stdout stderr
9 | worker-20181102141149-192.168.1.35-36166 | 3 | 51200 | EXITED | stdout stderr
12 | worker-20181102141149-192.168.1.8-36191 | 3 | 51200 | EXITED | stdout stderr
4 | worker-20181102141149-192.168.1.35-36166 | 3 | 51200 | EXITED | stdout stderr
0 | worker-20181102141149-192.168.1.8-36191 | 3 | 51200 | EXITED | stdout stderr
8 | worker-20181019165432-192.168.1.8-46594 | 3 | 51200 | EXITED | stdout stderr


@pomadchin
Copy link
Member

pomadchin commented Nov 2, 2018

Kind of; though requested just a web ui page (spark-driver:4040).

@pomadchin
Copy link
Member

pomadchin commented Nov 16, 2018

I ran an ingest on the a_r_1_1.tif tiff:

image

image

Config is the same as yours, i ingested from s3 on s3.
Launch options:

spark-submit,--master,yarn-cluster,\
--class,geotrellis.spark.etl.MultibandIngest,\
--driver-memory,4200M,\
--driver-cores,2,\
--executor-memory,4200M,\
--executor-cores,2,\
--conf,spark.dynamicAllocation.enabled=true,\
--conf,spark.yarn.executor.memoryOverhead=700,\
--conf,spark.yarn.driver.memoryOverhead=700,\
s3://geotrellis-test/daunnc/geotrellis-spark-etl-assembly-${GEOTRELLIS_VERSION}${GEOTRELLIS_VERSION_SUFFIX}.jar,\
--input,"s3://geotrellis-test/issue-2819/cfg/input.json",\
--output,"s3://geotrellis-test/issue-2819/cfg/output.json",\
--backend-profiles,"s3://geotrellis-test/issue-2819/cfg/backend-profiles.json"

input.json:

[{
  "format": "multiband-geotiff",
  "name": "china",
  "cache": "NONE",
  "backend": {
    "type": "s3",
    "path": "s3://bucket/issue-2819/data/a_r_1_1.tif"
  }
}]

output.json:

{
  "backend": {
    "type": "s3",
    "path": "s3://bucket/issue-2819/catalog/"
  },
  "reprojectMethod": "buffered",
  "pyramid": true,
  "tileSize": 256,
  "keyIndexMethod": {
    "type": "zorder"
  },
  "resampleMethod": "nearest-neighbor",
  "layoutScheme": "zoomed",
  "crs": "EPSG:3857"
}

master: m3.xlarge
2xworkers: m3.xlarge

Everything looks fine, trying the ingest of all images.

@pomadchin
Copy link
Member

pomadchin commented Nov 16, 2018

All right, i noticed that:

image

image

Where the total size of the input dataset is: 14.0 GiB

@pomadchin pomadchin removed the bug label Nov 19, 2018
@pomadchin
Copy link
Member

pomadchin commented Nov 19, 2018

Hey @mingnet, I added more nodes (10x m3.xlarge nodes), and measured the DISK space usage:

image

Cluster disk usage for this task (p.s. I launched this job with dynamic allocation enabled):

image

image

image

image

image

image

image

image

image

image

So the result is that it's an expected thing and smth constant across the entire cluster. Disk space problems can be resolved by adding more nodes, and it doesn't mean that each node should have a disk space size equal to the input data size.

I'd recommend you to test it more, actually we didn't have issues on ingesting large datasets :/

Also It's only a temporary disk space occupation required for the saprk work. Along with that don't forget to clean up spark work dirs manually after finishing your job.

Let me know if this explanation is not good enough, and feel free to close / keep this issue open.
I'm removing a bug label since it doesn't seem to be a bug.

@pomadchin
Copy link
Member

Hey @mingnet I'm closing this issue, but feel free to re-open it again if the are any more issues or you can create a new one. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants