Download from SDSSDR18 all spectra from the match

In [1]:
import os
import time
import numpy as np
import pandas as pd
from astroquery.sdss import SDSS
from astropy.io import fits
from astropy.table import Table

In [14]:
crossmatch = pd.read_csv('/Users/tharacaba/Downloads/ProjectBig/gaia_results_all_batches.csv')
crossmatch

Unnamed: 0,SOURCE_ID,original_ext_source_id
0,3647851003078664832,1237648703516442877
1,3661958424457651712,1237648704049840359
2,3662377883848390912,1237648704586973290
3,4419851136648786304,1237648705670152326
4,4407884120813978752,1237648705678213667
...,...,...
133828,1779912581508627072,1237680273108960003
133829,309194386402373120,1237680285996155390
133830,1883704073988238336,1237680303715582376
133831,2866659811294043904,1237680308016841309


In [15]:
best_obj_ids = crossmatch['original_ext_source_id'].unique()
batch = best_obj_ids[:50]  # Test with a very small batch
batch_ids = ','.join(map(str, batch))

query = f"""
SELECT plate, mjd, fiberid, bestObjID
FROM SpecObj
WHERE bestObjID IN ({batch_ids})
"""
result = SDSS.query_sql(query)
if result is not None:
    spec_df = result.to_pandas()
    print(spec_df)

    plate    mjd  fiberid            bestObjID
0     303  51615      205  1237648703516442877
1     299  51671      179  1237648704049840359
2     300  51943      309  1237648704586973290
3     311  51665      443  1237648705670152326
4     364  52000      380  1237648705678213667
5     267  51608      300  1237648720142663801
6     474  52000      142  1237648720677044549
7     290  51941       21  1237648720698277967
8     271  51883      200  1237648721219551336
9     279  51984      396  1237648721225711706
10    292  51609      589  1237648721773396235
11    305  51613      609  1237648721784078676
12    308  51662      546  1237648721786175579
13    312  51689      471  1237648721789124738
14    503  51999       85  1237648722830557410
15    285  51930      579  1237648722841501766
16    419  51879      492  1237649920574750877
17    463  51908      534  1237649963532353548
18    461  51910      361  1237649964066930743
19    330  52370      257  1237650369407352875
20    331  52

In [16]:
# Initialize an empty DataFrame to store results
spec_df = pd.DataFrame()

# Define a small batch size for querying
batch_size = 50  # Small batch size to avoid exceeding query limits
max_retries = 3  # Number of retries for failed batches

# Process bestObjIDs in small batches
for batch_index, i in enumerate(range(0, len(best_obj_ids), batch_size), start=1):
    batch = best_obj_ids[i:i + batch_size]
    batch_ids = ','.join(map(str, batch))  # Properly format IDs
    
    query = f"""
    SELECT plate, mjd, fiberid, bestObjID
    FROM SpecObj
    WHERE bestObjID IN ({batch_ids})
    """
    
    retries = 0
    success = False
    
    while retries < max_retries and not success:
        try:
            # Query the SDSS database
            result = SDSS.query_sql(query)
            if result is not None:
                # Append results to the DataFrame
                spec_df = pd.concat([spec_df, result.to_pandas()], ignore_index=True)
            
            # Print progress every 100 batches
            if batch_index % 100 == 0 or batch_index == 1:
                print(f"Processed batch {batch_index}/{len(best_obj_ids) // batch_size + 1}.")
            
            success = True  # Mark this batch as successful
        except Exception as e:
            retries += 1
            print(f"Error processing batch {i} to {i + len(batch)}: {e}. Retrying ({retries}/{max_retries})...")
            time.sleep(2)  # Add a short delay before retrying

    if not success:
        print(f"Batch {i} to {i + len(batch)} failed after {max_retries} retries.")

# Save the results to a file
spec_df.to_csv("specobj_results.csv", index=False)
print("Query completed. Results saved to 'specobj_results.csv'.")

Processed batch 1/2676.
Processed batch 100/2676.
Processed batch 200/2676.
Processed batch 300/2676.
Processed batch 400/2676.
Processed batch 500/2676.
Processed batch 600/2676.
Processed batch 700/2676.
Processed batch 800/2676.
Processed batch 900/2676.
Processed batch 1000/2676.
Processed batch 1100/2676.
Processed batch 1200/2676.
Processed batch 1300/2676.
Processed batch 1400/2676.
Processed batch 1500/2676.
Processed batch 1600/2676.
Processed batch 1700/2676.
Processed batch 1800/2676.
Processed batch 1900/2676.
Processed batch 2000/2676.
Processed batch 2100/2676.
Processed batch 2200/2676.
Processed batch 2300/2676.
Processed batch 2400/2676.
Processed batch 2500/2676.
Processed batch 2600/2676.
Query completed. Results saved to 'specobj_results.csv'.


In [2]:
specobj = pd.read_csv('/Users/tharacaba/Downloads/ProjectBig/specobj_results.csv')
specobj

Unnamed: 0,plate,mjd,fiberid,bestObjID
0,303,51615,205,1237648703516442877
1,299,51671,179,1237648704049840359
2,300,51943,309,1237648704586973290
3,311,51665,443,1237648705670152326
4,364,52000,380,1237648705678213667
...,...,...,...,...
133789,7572,56944,710,1237680273108960003
133790,6265,56248,512,1237680285996155390
133791,6295,56536,434,1237680303715582376
133792,6515,56536,932,1237680308016841309


In [11]:
# Check for duplicates in original_ext_source_id
duplicate_ids = crossmatch[crossmatch.duplicated(subset='original_ext_source_id', keep=False)]

if duplicate_ids.empty:
    print("All IDs in 'original_ext_source_id' are unique.")
else:
    print(f"Found {len(duplicate_ids)} duplicate entries in 'original_ext_source_id'.")
    print("Duplicate entries:")
    print(duplicate_ids)

Found 77 duplicate entries in 'original_ext_source_id'.
Duplicate entries:
                  SOURCE_ID  original_ext_source_id
91       921068247868697728     1237653587407536295
92       921068243577055232     1237653587407536295
349      706252331821810944     1237660961862189211
350      706252331823294464     1237660961862189211
406     1541291590181192320     1237661852007727118
...                     ...                     ...
131800   147929260770079872     1237660558135656804
132095   148055807685252352     1237660559209332868
132096   148055807686085120     1237660559209332868
132667  3706417688227261568     1237661970652397647
132668  3706417692523377408     1237661970652397647

[77 rows x 2 columns]


In [None]:
# Load the spectra information
spectra_info = specobj  # Replace with your actual DataFrame

# Define the output directory for spectra
output_dir = "/Users/tharacaba/Downloads/ProjectBig/Spectra"
parquet_dir = "/Users/tharacaba/Downloads/ProjectBig/Spectra_Parquet"  # Directory for Parquet files
os.makedirs(output_dir, exist_ok=True)  # Create the directory if it doesn't exist

# Define batch size and retry settings
batch_size = 1000  # Number of spectra to process in one batch
max_retries = 3  # Number of retries for failed downloads

# Initialize a list to log failed spectra
failed_spectra = []

# Process spectra in batches
for batch_start in range(0, len(spectra_info), batch_size):
    # Get the current batch
    batch = spectra_info.iloc[batch_start:batch_start + batch_size]
    print(f"Processing batch {batch_start // batch_size + 1} ({batch_start} to {batch_start + len(batch)} entries)...")
    
    for idx, row in batch.iterrows():
        plate, mjd, fiberid = row['plate'], row['mjd'], row['fiberid']
        filename = os.path.join(output_dir, f"spectrum_plate{plate}_mjd{mjd}_fiber{fiberid}.fits")
        
        # Define Parquet filename
        parquet_file = os.path.join(parquet_dir, f"spectrum_plate{plate}_mjd{mjd}_fiber{fiberid}.parquet")
        
        # Skip download if Parquet file already exists
        if os.path.exists(parquet_file):
            continue
        
        # Retry mechanism
        retries = 0
        success = False
        while retries < max_retries and not success:
            try:
                spectrum = SDSS.get_spectra(plate=plate, mjd=mjd, fiberID=fiberid)
                if spectrum:
                    # Save the spectrum to the output directory
                    spectrum[0].writeto(filename, overwrite=True)
                    success = True
                else:
                    print(f"  Spectrum not found: plate={plate}, mjd={mjd}, fiberid={fiberid}")
                    retries = max_retries  # Exit retry loop as it's a permanent failure
            except Exception as e:
                retries += 1
                print(f"  Error downloading spectrum: plate={plate}, mjd={mjd}, fiberid={fiberid}. Error: {e}. Retrying...")
                time.sleep(2)  # Short delay before retrying
        
        if not success:
            failed_spectra.append({'plate': plate, 'mjd': mjd, 'fiberid': fiberid})
            print(f"  Failed to download spectrum after {max_retries} retries: plate={plate}, mjd={mjd}, fiberid={fiberid}")
    
    print(f"Finished processing batch {batch_start // batch_size + 1}.")

# Save the failed spectra log
if failed_spectra:
    failed_df = pd.DataFrame(failed_spectra)
    failed_df.to_csv("failed_spectra.csv", index=False)
    print(f"Failed spectra logged to 'failed_spectra.csv'.")

print("All spectra processing completed.")

Processing batch 1 (0 to 1000 entries)...
Finished processing batch 1.
Processing batch 2 (1000 to 2000 entries)...
Finished processing batch 2.
Processing batch 3 (2000 to 3000 entries)...
Finished processing batch 3.
Processing batch 4 (3000 to 4000 entries)...
Finished processing batch 4.
Processing batch 5 (4000 to 5000 entries)...
Finished processing batch 5.
Processing batch 6 (5000 to 6000 entries)...
Finished processing batch 6.
Processing batch 7 (6000 to 7000 entries)...
Finished processing batch 7.
Processing batch 8 (7000 to 8000 entries)...
Finished processing batch 8.
Processing batch 9 (8000 to 9000 entries)...
Finished processing batch 9.
Processing batch 10 (9000 to 10000 entries)...
Finished processing batch 10.
Processing batch 11 (10000 to 11000 entries)...
Finished processing batch 11.
Processing batch 12 (11000 to 12000 entries)...
Finished processing batch 12.
Processing batch 13 (12000 to 13000 entries)...
Finished processing batch 13.
Processing batch 14 (13000

MC1TEMDN=-0.00000000000000E+00 / sp1 mech median temp                            [astropy.io.fits.card]
MC1TBCT =-0.00000000000000E+00 / sp1 mech Temp_Blue_Cam_Top                      [astropy.io.fits.card]
MC1TEMDN=-0.00000000000000E+00 / sp1 mech median temp                            [astropy.io.fits.card]
MC1TBCT =-0.00000000000000E+00 / sp1 mech Temp_Blue_Cam_Top                      [astropy.io.fits.card]


  Error downloading spectrum: plate=9353, mjd=57814, fiberid=592. Error: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')). Retrying...
  Error downloading spectrum: plate=4261, mjd=55503, fiberid=656. Error: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')). Retrying...
Finished processing batch 64.
Processing batch 65 (64000 to 65000 entries)...
  Error downloading spectrum: plate=5329, mjd=55946, fiberid=270. Error: HTTPSConnectionPool(host='skyserver.sdss.org', port=443): Read timed out. (read timeout=60). Retrying...
  Error downloading spectrum: plate=471, mjd=51924, fiberid=386. Error: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')). Retrying...
  Error downloading spectrum: plate=5732, mjd=56326, fiberid=820. Error: HTTPSConnectionPool(host='skyserver.sdss.org', port=443): Read timed out. (read timeout=60). Retrying...
  Error downloading spectrum: 

In [6]:
# Define directories
fits_dir = "/Users/tharacaba/Downloads/ProjectBig/Spectra"  # Directory with FITS files
parquet_dir = "/Users/tharacaba/Downloads/ProjectBig/Spectra_Parquet"  # Directory for Parquet files
os.makedirs(parquet_dir, exist_ok=True)  # Ensure the Parquet directory exists

# Load the specObj list
spectra_info = specobj  # Replace with your file

# Define batch size and retry settings
batch_size = 1000  # Number of spectra to process in one batch
max_retries = 3  # Number of retries for failed downloads

# Process existing FITS files
print("Processing existing FITS files...")
for fits_file in os.listdir(fits_dir):
    if fits_file.endswith(".fits"):
        # Extract plate, mjd, and fiberid from the filename
        file_parts = fits_file.split("_")
        plate = file_parts[1][5:]
        mjd = file_parts[2][3:]
        fiberid = file_parts[3][5:].replace(".fits", "")
        
        # Define Parquet filename
        parquet_file = os.path.join(parquet_dir, f"spectrum_plate{plate}_mjd{mjd}_fiber{fiberid}.parquet")
        
        # Skip if Parquet file already exists
        if os.path.exists(parquet_file):
            continue
        
        # Open FITS file and extract data
        fits_path = os.path.join(fits_dir, fits_file)
        try:
            with fits.open(fits_path) as hdul:
                data = Table(hdul[1].data)
                
                # Convert to native byte order
                flux = np.array(data["flux"]).byteswap().newbyteorder()
                loglam = np.array(data["loglam"]).byteswap().newbyteorder()
            
            # Save to Parquet
            df = pd.DataFrame({"loglam": loglam, "flux": flux})
            df.to_parquet(parquet_file, index=False)
            
            # Delete the FITS file
            os.remove(fits_path)
            #print(f"Processed and deleted: {fits_file}")
        except Exception as e:
            print(f"Error processing {fits_file}: {e}")
print("Finished processing existing FITS files.")

Processing existing FITS files...
Finished processing existing FITS files.


In [22]:
import os
import pandas as pd
import numpy as np

# Define directories
parquet_dir = "/Users/tharacaba/Downloads/ProjectBig/Spectra_Parquet"  # Directory for Parquet files

# Load the DataFrame with bestObjID, plate, mjd, and fiberid
metadata_df = specobj  # Replace with your metadata file

# Initialize a list to store results
results = []

# Process each Parquet file
for parquet_file in os.listdir(parquet_dir):
    if parquet_file.endswith(".parquet"):
        # Extract plate, mjd, and fiberid from the filename
        file_parts = parquet_file.split("_")
        plate = int(file_parts[1][5:])
        mjd = int(file_parts[2][3:])
        fiberid = int(file_parts[3][5:].replace(".parquet", ""))
        
        # Load the Parquet file
        parquet_path = os.path.join(parquet_dir, parquet_file)
        try:
            data = pd.read_parquet(parquet_path)
            
            # Convert loglam to lam
            data['lam'] = 10 ** data['loglam']
            
            # Define wavelength ranges
            range1 = (3300, 6800)  # First range
            range2 = (6400, 9150)  # Second range
            
            # Calculate integrated flux for each range
            mask1 = (data['lam'] >= range1[0]) & (data['lam'] <= range1[1])
            mask2 = (data['lam'] >= range2[0]) & (data['lam'] <= range2[1])
            
            flux1 = np.trapz(data.loc[mask1, 'flux'], data.loc[mask1, 'lam'])
            flux2 = np.trapz(data.loc[mask2, 'flux'], data.loc[mask2, 'lam'])
            
            # Calculate the flux ratio
            flux_ratio = flux1 / flux2 if flux2 != 0 else np.nan
            
            # Find the bestObjID from metadata
            matched_row = metadata_df[
                (metadata_df['plate'] == plate) & 
                (metadata_df['mjd'] == mjd) & 
                (metadata_df['fiberid'] == fiberid)
            ]
            
            if not matched_row.empty:
                bestObjID = matched_row['bestObjID'].values[0]
                results.append({'bestObjID': bestObjID, 'plate': plate, 'mjd': mjd, 'fiberid': fiberid, 'flux_ratio': flux_ratio})
            else:
                print(f"No match found for Parquet file: {parquet_file}")
        except Exception as e:
            print(f"Error processing {parquet_file}: {e}")
        finally:
            # Delete the Parquet file after processing
            os.remove(parquet_path)
            #print(f"Deleted Parquet file: {parquet_file}")

# Create a DataFrame with the results
results_df = pd.DataFrame(results)

# Save the results to a CSV file
results_df.to_csv("flux_ratios.csv", index=False)

print("Flux ratios calculated, saved to 'flux_ratios.csv', and Parquet files deleted.")

Flux ratios calculated, saved to 'flux_ratios.csv', and Parquet files deleted.


In [23]:
results_df

Unnamed: 0,bestObjID,plate,mjd,fiberid,flux_ratio
0,1237655693011845153,918,52404,515,1.028455
1,1237665024914423991,2121,54180,475,1.027186
2,1237667429022629905,2295,53734,201,1.742462
3,1237668272448733204,2768,54265,520,0.898819
4,1237662236931457149,1233,52734,185,1.587512
...,...,...,...,...,...
2735,1237668294982565897,2490,54179,322,1.160761
2736,1237655498126852309,910,52377,189,1.303406
2737,1237651752391213235,275,51910,324,1.141400
2738,1237663917882802416,1876,54464,491,1.116426
